시작이 반

[Kotlin] Coroutine 공유 데이터 문제 해결하기 본문

Programming/Kotlin

[Kotlin] Coroutine 공유 데이터 문제 해결하기

G_Gi 2023. 5. 15. 16:55
SMALL

Kotlin은 비동기 작업을 다룰 수 있는 동시성 라이브러리 이다.

 

동시성에는 잘못된 사용으로 여러 문제를 발생시킬 수 있는데 공유 데이터가 그중 하나이다.

 

여러 코루틴이 동시에 공유된 변수나 자료구조에 접근 하는 경우 데이터 경합이 발생할 수 있으며 이럴 때 해결할 수 있는 방법에 대해서 알아보자.

 

문제 코드

import kotlinx.coroutines.*
import kotlin.random.Random
import kotlin.system.measureTimeMillis

suspend fun massiveRun(action: suspend () -> Unit){
    val n = 100
    val k = 1000
    val elapsed = measureTimeMillis {
        coroutineScope {
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("$elapsed ms동안  ${n * k}개의 액션을 수행했습니다.")
}
var counter = 0


//withContext는 수행이 완료될 때까지 기다리는 코루틴 빌더, 겉에 코루틴이 잠듦
//Dispatcher에 의해 어떻게 코루틴이 할당되었느냐에 따라서 값이 달라짐 > Default이기 때문에 다른 쓰레드에서 실행
fun main() = runBlocking{
    withContext(Dispatchers.Default){
        massiveRun {
            counter++
        }
    }

    println("Counter = $counter")
}



/////결과
21 ms동안  100000개의 액션을 수행했습니다.
Counter = 47836

Counter가 100000 가 되길 바라지만 정상적으로 값이 나오지 않았다.

 

 

1. Volatile

@Volatile 어노테이션을 사용하여 가시성 문제를 해결할 수 있다.

(가시성 문제란?

프로그램의 변수는 캐시나 레지스터에 저장이 되는데 여러 스레드에서 다른 변수를 볼때 값을 올바르게 보지 못하는 문제이다.)

 

@Volatile은 변수를 메모리 가시성(memory visibility) 관점에서 동기화하는 데 사용되는 키워드이다. @Volatile 키워드가 지정된 변수는 쓰기 연산이 이루어지면 해당 값을 메인 메모리에 즉시 반영하고, 읽기 연산 시에는 항상 메인 메모리에서 값을 읽어오며, 변수의 가시성 문제를 해결할 수 있다.

import kotlinx.coroutines.*
import kotlin.random.Random
import kotlin.system.measureTimeMillis

suspend fun massiveRun(action: suspend () -> Unit){
    val n = 100
    val k = 1000
    val elapsed = measureTimeMillis {
        coroutineScope {
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("$elapsed ms동안  ${n * k}개의 액션을 수행했습니다.")
}

// but 다른 쓰레드에서 현재 값을 바꿀 수 있기 때문에 동시에 읽고 수정하는 문제 해결x
@Volatile
var counter = 0


fun main() = runBlocking{
    withContext(Dispatchers.Default){
        massiveRun {
            counter++
        }
    }

    println("Counter = $counter")
}


////////////결과
18 ms동안  100000개의 액션을 수행했습니다.
Counter = 63127

하지만 해당 문제는 해결되지 않았다.

가시성 문제는 해결이 되었지만 원자성 문제는 해결이 되지 않았기 때문이다.

 

 

2.원자적 연산을 지원하는 동기화 메커니즘(예: AtomicInteger, AtomicBoolean, AtomicReference 등) 사용

쓰레드에 안전한 구조로 값을 변경할 때 다른 쓰레드에서 관여하지 못한다.

import kotlinx.coroutines.*
import java.util.concurrent.atomic.AtomicInteger
import kotlin.system.measureTimeMillis

suspend fun massiveRun2(action: suspend () -> Unit){
    val n = 100
    val k = 1000
    val elapsed = measureTimeMillis {
        coroutineScope {
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("$elapsed ms동안  ${n * k}개의 액션을 수행했습니다.")
}

//해결2 : 쓰레드 안전한 구조 값을 변경할때 다른 쓰레드가 관여하지 못함
val counter2 = AtomicInteger()

fun main() = runBlocking{
    withContext(Dispatchers.Default){
        massiveRun2 {
            counter2.incrementAndGet()
        }
    }

    println("Counter = $counter2")
}

///////////결과
18 ms동안  100000개의 액션을 수행했습니다.
Counter = 100000

 

 

3. newSingleThreadContext 특정 쓰레드를 만들어서 해당 쓰레드에서만 가능하도록 한정한다.

import kotlinx.coroutines.*
import java.util.concurrent.atomic.AtomicInteger
import kotlin.system.measureTimeMillis

suspend fun massiveRun3(action: suspend () -> Unit){
    val n = 100
    val k = 1000
    val elapsed = measureTimeMillis {
        coroutineScope {
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("$elapsed ms동안  ${n * k}개의 액션을 수행했습니다.")
}

var counter3 = 0
//해결3 : 쓰레드 한정을 이용, 특정 쓰레드를 하나 만들어서 그 쓰레드만 사용
val counterContext = newSingleThreadContext("CounterContext")

fun main() = runBlocking{
    withContext(Dispatchers.Default){
        massiveRun3 {
            withContext(counterContext){ // 더하는 코드를 하나의 스레드에서
                counter3++
            }
        }
    }

    println("Counter = $counter3")
}

//////결과
778 ms동안  100000개의 액션을 수행했습니다.
Counter = 100000

하지만 해당 방법은 해당 컨텍스트 내에서는 동시성 문제를 해결히지만, 외부 공유 변수에 대해서는 해결하지 못한다.

 

4. Mutex사용하기

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.util.concurrent.atomic.AtomicInteger
import kotlin.system.measureTimeMillis

suspend fun massiveRun4(action: suspend () -> Unit){
    val n = 100
    val k = 1000
    val elapsed = measureTimeMillis {
        coroutineScope {
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("$elapsed ms동안  ${n * k}개의 액션을 수행했습니다.")
}

//Mutex 공유 상태를 수정할 때 임계 영역을 이용하게 하여, 동시접근 허용x
val mutex = Mutex()
var counter4 = 0

fun main() = runBlocking{
    withContext(Dispatchers.Default){
        massiveRun4 {
            mutex.withLock { //하나의 스레드만 들어가고 나머지 스레드는 기다림
                counter4++
            }
        }
    }

    println("Counter = $counter4")
}

/////결과
263 ms동안  100000개의 액션을 수행했습니다.
Counter = 100000

Mutex를 사용하여 가시성과 원자성 모두 해결 가능하다.

 

5. actor 사용

actor는 코루틴과 메세지 전달을 기반으로 동작하며 상태와 메세지를 가지는 독립적인 개체로, 다른 액터와 메세지를 주고받으면서 동작한다. 각 액터는 내부의 상태를 변경하거나 외부와 상호작용할 수 있으며, 메세지는 액터 간에 비동기적으로 전달한다. 순차적으로 메세지를 처리하고, 상태를 변경하기 때문에 상호작용하는 동안 다른 액터의 영향을 받지 않는다.

 

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.util.concurrent.atomic.AtomicInteger
import kotlin.system.measureTimeMillis

suspend fun massiveRun5(action: suspend () -> Unit){
    val n = 100
    val k = 1000
    val elapsed = measureTimeMillis {
        coroutineScope {
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("$elapsed ms동안  ${n * k}개의 액션을 수행했습니다.")
}

//액터가 독점적으로 자료를 가지며 그 자료를 다른 코루틴과 공유하지 않고 액터를 통해서만 접근

//sealed class : 외부 확장 불가능 클래스
//액터에게 보내기 위해 만들어진 객체
sealed class CounterMsg
object IncCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()

fun CoroutineScope.counterActor() = actor<CounterMsg>{
    var counter = 0 // 액터 안에 상태를 캡슐화해두고 다른 코루틴이 접근하지 못하게함

    for (msg in channel){ //channel 은 외부에서 보내는 데이터를 받음
        when(msg){
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter) //현재 상태 반환
        }
    }
}

//channel을 통해서 시그널을 보내야함
fun main() = runBlocking<Unit>{
    val counter = counterActor()
    withContext(Dispatchers.Default){
        massiveRun4 {
            counter.send(IncCounter)
        }
    }

    val response = CompletableDeferred<Int>()//값을 여기에 담음
    counter.send(GetCounter(response))
    println("Counter = ${response.await()}")
    counter.close()
}

메세지는 보통 sealed class 로 정의되며 이는 외부 확장이 불가능한 클래스이다.

누가 나를 상속하였는지 알 수 있음

object 객체는 싱글턴으로 객체를 생성해준다.

 

fun CoroutineScope.counterActor() = actor<CounterMsg>{ ... }

counterActor 함수에서 actor<CounterMsg> { ... } 는 Channel을 생성하고 해당 채널로부터 메세지를 수신하는 액터를 생성하는 코드이다. 

for (msg in channel){ ... }

for문을 통해 채널로부터 메세지를 수신한다.

 

when(msg){
    is IncCounter -> counter++
    is GetCounter -> msg.response.complete(counter) //현재 상태 반환
}

when 절을 이용하여 각 메세지마다 어떤 행동을 할지 정한다. ( 카운트 증가, 카운트 반환)

 

메세지는 send함수를 통해서 전달할 수 있다.

LIST