일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- 백준 16235
- Spring Boot
- MySQL
- spring oauth
- java 기술면접
- 백준 16719
- 백준 16236
- java
- 백준 파이썬
- 백준
- with recursive
- 백준 19238
- 백준 15685
- spring security
- sql 기술면접
- 파이썬
- 백준 17626
- Coroutine
- 프로그래머스
- 백준 17779
- JVM
- re.split
- Spring
- spring cloud
- MSA
- 웹어플리케이션 서버
- 프로래머스
- springboot
- Kotlin
- JPA
- Today
- Total
시작이 반
[Kotlin] Coroutine 공유 데이터 문제 해결하기 본문
Kotlin은 비동기 작업을 다룰 수 있는 동시성 라이브러리 이다.
동시성에는 잘못된 사용으로 여러 문제를 발생시킬 수 있는데 공유 데이터가 그중 하나이다.
여러 코루틴이 동시에 공유된 변수나 자료구조에 접근 하는 경우 데이터 경합이 발생할 수 있으며 이럴 때 해결할 수 있는 방법에 대해서 알아보자.
문제 코드
import kotlinx.coroutines.*
import kotlin.random.Random
import kotlin.system.measureTimeMillis
suspend fun massiveRun(action: suspend () -> Unit){
val n = 100
val k = 1000
val elapsed = measureTimeMillis {
coroutineScope {
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("$elapsed ms동안 ${n * k}개의 액션을 수행했습니다.")
}
var counter = 0
//withContext는 수행이 완료될 때까지 기다리는 코루틴 빌더, 겉에 코루틴이 잠듦
//Dispatcher에 의해 어떻게 코루틴이 할당되었느냐에 따라서 값이 달라짐 > Default이기 때문에 다른 쓰레드에서 실행
fun main() = runBlocking{
withContext(Dispatchers.Default){
massiveRun {
counter++
}
}
println("Counter = $counter")
}
/////결과
21 ms동안 100000개의 액션을 수행했습니다.
Counter = 47836
Counter가 100000 가 되길 바라지만 정상적으로 값이 나오지 않았다.
1. Volatile
@Volatile 어노테이션을 사용하여 가시성 문제를 해결할 수 있다.
(가시성 문제란?
프로그램의 변수는 캐시나 레지스터에 저장이 되는데 여러 스레드에서 다른 변수를 볼때 값을 올바르게 보지 못하는 문제이다.)
@Volatile은 변수를 메모리 가시성(memory visibility) 관점에서 동기화하는 데 사용되는 키워드이다. @Volatile 키워드가 지정된 변수는 쓰기 연산이 이루어지면 해당 값을 메인 메모리에 즉시 반영하고, 읽기 연산 시에는 항상 메인 메모리에서 값을 읽어오며, 변수의 가시성 문제를 해결할 수 있다.
import kotlinx.coroutines.*
import kotlin.random.Random
import kotlin.system.measureTimeMillis
suspend fun massiveRun(action: suspend () -> Unit){
val n = 100
val k = 1000
val elapsed = measureTimeMillis {
coroutineScope {
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("$elapsed ms동안 ${n * k}개의 액션을 수행했습니다.")
}
// but 다른 쓰레드에서 현재 값을 바꿀 수 있기 때문에 동시에 읽고 수정하는 문제 해결x
@Volatile
var counter = 0
fun main() = runBlocking{
withContext(Dispatchers.Default){
massiveRun {
counter++
}
}
println("Counter = $counter")
}
////////////결과
18 ms동안 100000개의 액션을 수행했습니다.
Counter = 63127
하지만 해당 문제는 해결되지 않았다.
가시성 문제는 해결이 되었지만 원자성 문제는 해결이 되지 않았기 때문이다.
2.원자적 연산을 지원하는 동기화 메커니즘(예: AtomicInteger, AtomicBoolean, AtomicReference 등) 사용
쓰레드에 안전한 구조로 값을 변경할 때 다른 쓰레드에서 관여하지 못한다.
import kotlinx.coroutines.*
import java.util.concurrent.atomic.AtomicInteger
import kotlin.system.measureTimeMillis
suspend fun massiveRun2(action: suspend () -> Unit){
val n = 100
val k = 1000
val elapsed = measureTimeMillis {
coroutineScope {
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("$elapsed ms동안 ${n * k}개의 액션을 수행했습니다.")
}
//해결2 : 쓰레드 안전한 구조 값을 변경할때 다른 쓰레드가 관여하지 못함
val counter2 = AtomicInteger()
fun main() = runBlocking{
withContext(Dispatchers.Default){
massiveRun2 {
counter2.incrementAndGet()
}
}
println("Counter = $counter2")
}
///////////결과
18 ms동안 100000개의 액션을 수행했습니다.
Counter = 100000
3. newSingleThreadContext 특정 쓰레드를 만들어서 해당 쓰레드에서만 가능하도록 한정한다.
import kotlinx.coroutines.*
import java.util.concurrent.atomic.AtomicInteger
import kotlin.system.measureTimeMillis
suspend fun massiveRun3(action: suspend () -> Unit){
val n = 100
val k = 1000
val elapsed = measureTimeMillis {
coroutineScope {
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("$elapsed ms동안 ${n * k}개의 액션을 수행했습니다.")
}
var counter3 = 0
//해결3 : 쓰레드 한정을 이용, 특정 쓰레드를 하나 만들어서 그 쓰레드만 사용
val counterContext = newSingleThreadContext("CounterContext")
fun main() = runBlocking{
withContext(Dispatchers.Default){
massiveRun3 {
withContext(counterContext){ // 더하는 코드를 하나의 스레드에서
counter3++
}
}
}
println("Counter = $counter3")
}
//////결과
778 ms동안 100000개의 액션을 수행했습니다.
Counter = 100000
하지만 해당 방법은 해당 컨텍스트 내에서는 동시성 문제를 해결히지만, 외부 공유 변수에 대해서는 해결하지 못한다.
4. Mutex사용하기
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.util.concurrent.atomic.AtomicInteger
import kotlin.system.measureTimeMillis
suspend fun massiveRun4(action: suspend () -> Unit){
val n = 100
val k = 1000
val elapsed = measureTimeMillis {
coroutineScope {
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("$elapsed ms동안 ${n * k}개의 액션을 수행했습니다.")
}
//Mutex 공유 상태를 수정할 때 임계 영역을 이용하게 하여, 동시접근 허용x
val mutex = Mutex()
var counter4 = 0
fun main() = runBlocking{
withContext(Dispatchers.Default){
massiveRun4 {
mutex.withLock { //하나의 스레드만 들어가고 나머지 스레드는 기다림
counter4++
}
}
}
println("Counter = $counter4")
}
/////결과
263 ms동안 100000개의 액션을 수행했습니다.
Counter = 100000
Mutex를 사용하여 가시성과 원자성 모두 해결 가능하다.
5. actor 사용
actor는 코루틴과 메세지 전달을 기반으로 동작하며 상태와 메세지를 가지는 독립적인 개체로, 다른 액터와 메세지를 주고받으면서 동작한다. 각 액터는 내부의 상태를 변경하거나 외부와 상호작용할 수 있으며, 메세지는 액터 간에 비동기적으로 전달한다. 순차적으로 메세지를 처리하고, 상태를 변경하기 때문에 상호작용하는 동안 다른 액터의 영향을 받지 않는다.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.util.concurrent.atomic.AtomicInteger
import kotlin.system.measureTimeMillis
suspend fun massiveRun5(action: suspend () -> Unit){
val n = 100
val k = 1000
val elapsed = measureTimeMillis {
coroutineScope {
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("$elapsed ms동안 ${n * k}개의 액션을 수행했습니다.")
}
//액터가 독점적으로 자료를 가지며 그 자료를 다른 코루틴과 공유하지 않고 액터를 통해서만 접근
//sealed class : 외부 확장 불가능 클래스
//액터에게 보내기 위해 만들어진 객체
sealed class CounterMsg
object IncCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()
fun CoroutineScope.counterActor() = actor<CounterMsg>{
var counter = 0 // 액터 안에 상태를 캡슐화해두고 다른 코루틴이 접근하지 못하게함
for (msg in channel){ //channel 은 외부에서 보내는 데이터를 받음
when(msg){
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter) //현재 상태 반환
}
}
}
//channel을 통해서 시그널을 보내야함
fun main() = runBlocking<Unit>{
val counter = counterActor()
withContext(Dispatchers.Default){
massiveRun4 {
counter.send(IncCounter)
}
}
val response = CompletableDeferred<Int>()//값을 여기에 담음
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close()
}
메세지는 보통 sealed class 로 정의되며 이는 외부 확장이 불가능한 클래스이다.
누가 나를 상속하였는지 알 수 있음
object 객체는 싱글턴으로 객체를 생성해준다.
fun CoroutineScope.counterActor() = actor<CounterMsg>{ ... }
counterActor 함수에서 actor<CounterMsg> { ... } 는 Channel을 생성하고 해당 채널로부터 메세지를 수신하는 액터를 생성하는 코드이다.
for (msg in channel){ ... }
for문을 통해 채널로부터 메세지를 수신한다.
when(msg){
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter) //현재 상태 반환
}
when 절을 이용하여 각 메세지마다 어떤 행동을 할지 정한다. ( 카운트 증가, 카운트 반환)
메세지는 send함수를 통해서 전달할 수 있다.
'Programming > Kotlin' 카테고리의 다른 글
[Kotlin] SpringBoot + Coroutine 성능테스트 (1) | 2023.06.06 |
---|---|
[Kotlin] Coroutine을 사용하여 API 병렬 처리 (0) | 2023.04.30 |
[Kotlin] CoroutineScope란? (1) | 2023.04.27 |
[Kotlin] 코틀린 문법 (0) | 2022.09.06 |