병렬처리하는 법을 알아보자.
예를 들어보자.
[1, 2, 3, 4, 5 ,6, 7, 3, 32, 12, 23, 18, 10] 리스트의 합계를 구해보자.
fold를 사용해 계산한다고 하면, 이렇게 될 것이다.
1 + 2 + ...
3 + 4 + ...
7 + 5 + ...
12 + 6 + ...
18 + 7 + ...
25 + 3 + ...
28 + 32 + ...
60 + 12 + ...
72 + 23 + ...
95 + 18 + ...
113 + 10
123
총 시간은 O(n)만큼 걸린다.
분할 정복을 사용해보자.
[1, 2, 3, 4, 5 ,6, 7,]
[3, 32, 12, 23, 18, 10]
두 리스트의 합으로 볼 수 있다.
더 잘게 쪼개면
[1, 2, 3, 4]
[5 ,6, 7,]
[3, 32, 12]
[23, 18, 10]
이렇게 계속 쪼갤 수 있다.
쪼갠 각 리스트의 합을 모두 더하면 전체 합을 구할 수 있죠.
자 그럼 이제 병렬 처리할 단위를 구할 수 있습니다.
쪼갠 리스트를 각 스레드에 할당해서 계산하면 되겠죠~
사실 이번 장에서는 병렬처리를 어떻게 구현하느냐는 관심없스요.
그냥 병렬처리를 사용하는 함수형 API를 만드는 것이 목적인 것이요.
위에서의 예제를 코드로 봐볼게요.
fun sum(ints: List<Int>): Int =
if (ints.size <= 1) {
ints.firstOrNull() ?: 0
} else {
val l = ints.subList(0, ints.size / 2)
val r = ints.subList(ints.size / 2, ints.size)
sum(l) + sum(r)
}
sum(l) + sum(r)
에서 각 계산을 병렬로 처리해야하죠! 근데 지금은 sum(l)
만 주구장창 계산하다. sum(r)
을 계산하기 시작할거에요.
그래서 이건 병렬 계산이에요~ 라는 걸 알려줄 자료형을 정해봐요.
Parallel을 줄여서 Par
이라고 할게요.
class Par<ITEM>(val get: ITEM)
fun <ITEM> unit(item: () -> ITEM): Par<ITEM> = Par(item())
fun <ITEM> get(item: Par<ITEM>): ITEM = item.get
병렬계산을 하고 계산 결과를 item에 담는 자료형을 만들었습니다.
예제에 Par
을 써보죠
fun sum2(ints: List<Int>): Int =
if (ints.size <= 1) {
ints.firstOrNull() ?: 0
} else {
val l = ints.subList(0, ints.size / 2)
val r = ints.subList(ints.size / 2, ints.size)
val sumL: Par<Int> = unit { sum2(l) }
val sumR: Par<Int> = unit { sum2(r) }
sumL.get + sumR.get
}
sumL.get + sumR.get
가 호출되기 전까지는 sum(l)
, sum(r)
가 평가되지 않는다.
하지만 get을 호출하면 sum(l)
이 모두 평가되고 난 후 sum(r)
이 순차적으로 평가 될 것이다.
병렬처리를 하고싶어 만들었지만 실제로는 순차처리되는 것이다..
위 코드에서 sumL
과 sumR
을 치환해보면 참조투명성을 깨는 것을 볼 수 있다.
unit { sum(l) }.get + unit { sum(r) }.get
결과는 똑같겠지만, unit { sum(l) }
이 모두 평가되어야 unit { sum(r) }
이 초기화된다.
위에서 본 것과는 차이가 있다.
다만 get
을 사용하지 않는다면 이런 참조 투명성을 해치는 부수효과가 없다.
이 부수효과를 해결하기 위해서 get 호출을 최대한 미뤄야한다.
어떻게 해야할까.
일단 get
호출을 하지 않는다는 것을 전제로하자.
그럼 sum
함수는 Par<Int>
를 반환하는 게 맞다.
fun sum3(ints: List<Int>): Par<Int> =
if (ints.size <= 1) {
unit { ints.firstOrNull() ?: 0 }
} else {
val l = ints.subList(0, ints.size / 2)
val r = ints.subList(ints.size / 2, ints.size)
map2(sum3(l), sum3(r)) { lx: Int, rx: Int -> lx + rx }
}
map2
함수를 만들어서 구현했다.
fun <ITEM1, ITEM2, RESULT> map2(item1: Par<ITEM1>, item2: Par<ITEM2>, block: (ITEM1, ITEM2) -> RESULT): Par<RESULT> =
unit { block(item1.get, item2.get) }
이렇게 정의할 수 있다.
재귀적인 경우 더이상 unit을 호출하지 않는다.
자 그럼 이제 논의할 것은 map2
의 인자를 지연 계산으로 받을 것이냐이다.
map2
가 엄격한 인자를 사용할 때 문제점은 다음과 같다.
sum(listOf(1,2,3,4))
map2(
sum(listOf(1, 2)),
sum(listOf(3, 4))
)
map2(
map2(
listOf(1),
listOf(2)
),
sum(listOf(3, 4))
)
map2(
map2(
listOf(1),
listOf(2)
),
map2(
listOf(3),
listOf(4)
),
)
이렇게 왼쪽을 완전히 계산한 다음 오른쪽을 계산하게 된다.
두 파라미터를 동시에 병렬 계산하지 않는 것이다.
두 파라미터를 동시에 병렬계산 하도록 하려면 map2
를 지연계산으로 만들어야한다.
이렇게 결정하고
다음으로 볼 것은 map2
는 항상 지연계산일 필요가 있냐는 것이다.
map2(
unit { 1 },
unit { 2 }
)
이 경우에는 굳이 병렬로 할 필요가 없다. 오히려 성능이 안좋아진다.
스레드 분기를 명확하게 만들어주자.
다른 스레드에서 실행돼요~ 라는 의미를 가지는 fork
함수를 만들겠어요.
fun sum4(ints: List<Int>): Par<Int> =
if (ints.size <= 1) {
lazyUnit { ints.firstOrNull() ?: 0 }
} else {
val l = ints.subList(0, ints.size / 2)
val r = ints.subList(ints.size / 2, ints.size)
map2(fork { sum4(l) }, fork { sum4(r) }) { lx: Int, rx: Int ->
lx + rx }
}
그렇다면 인자가 적으면 별도의 스레드를 사용하지 않겠다. 는 어떻게 할 수 있을까.
fun sum4(ints: List<Int>): Par<Int> =
if (ints.size <= 1) {
lazyUnit { ints.firstOrNull() ?: 0 }
} else {
val l = ints.subList(0, ints.size / 2)
val r = ints.subList(ints.size / 2, ints.size)
if (l.size <= 2) {
map2(sum4(l), fork { sum4(r) }) { lx: Int, rx: Int ->
lx + rx }
} else {
map2(fork { sum4(l) }, fork { sum4(r) }) { lx: Int, rx: Int ->
lx + rx }
}
}
이렇게하는 걸까.
자. 어쨌든 fork
를 써서 "별도의 스레드에서 실행하겠어"라는 것을 만들어줬다.
그럼 unit은 지연 계산할 필요가 있을까?
fun <ITEM> unit(item: () -> ITEM): Par<ITEM> = Par(item())
unit
은 이렇게 만들었다.
앞에서 fork
를 만들었으니, unit
을 지연 계산하려면, fork
를 사용하는 방법을 생각해 볼 수 있다.
그러면 엄격한 unit
과 엄격하지 않은 unit
을 만들 수 있다.
fun <ITEM> unit(item: ITEM): Par<ITEM> = Par(item)
fun <ITEM> lazyUnit(item: () -> ITEM): Par<ITEM> =
fork { unit(item()) }
unit
같은 기본형 콤비네이터로, lazyUnit
같은 파생형 콤비네이터를 만들어냈다.
스레드 분기는 언제해야할까?
- fork를 호출했을 때 해야할까?
- get으로 평가를 했을 때 스레드를 분기해야 할까?
- fork에 스레드를 분기 책임이 있는 경우
- fork의 구현은 스레드를 생성하고, 작업을 스레드 풀에 제출하는 방법을 알아야함
- => 스레드풀 같은 곳에 전역적으로 접근이 가능해야함
- => fork를 호출하는 시점에 스레드풀 같은 자원이 초기화 되어있어야함
- 이는 프로그램에 여러 부분에서 원하는 대로 병렬성 전략을 제어할 수 있는 능력을 잃는다는 것
- *get이 스레드 생성과 실행할 작업을 스레드 풀에 제출하는 책임을 가지는 게 더 적합하다.
이제 fork를 통해서 병렬계산을하라고 '표시' 할 수 있고, Par는 해당 계산이 병렬 계산이라는 것을 알려주는 자료형이다.
사실상 fork와 Par는 병렬처리의 내부구현에 대해 알 필요가 없다.
Par는 그냥 값을 담는 컨테이너로만 생각하면 된다.
fun <ITEM> run(item: Par<ITEM>): ITEM = TODO()
run이란 함수를 만들어서 run이 스레드를 시작하거나, 스레드 풀에 제출하거나 하는 등의 방법을 통해 병렬성을 구현하게 한다.
병렬 처리는 어떻게 해야할까?
java.util.concurrent.ExecutorService
클래스를 사용하자.
interface Callable<ITEM> {
fun call(): ITEM
}
interface Future<ITEM> {
fun get(): ITEM
fun get(timeout: Long, timeUnit: TimeUnit): ITEM
fun cancel(evenIfRunning: Boolean): Boolean
fun isDone(): Boolean
fun isCancelled(): Boolean
}
interface ExecutorService {
fun <ITEM> submit(c: Callable<ITEM>): Future<ITEM>
}
대충 코틀린으로 찌끄려보면 이렇다.
ExecutorService
에는 Callable
을 제출할 수 있다.
Future
는 새로운 드레드에서 실행될 수도 있는 계산을 가리키는 핸들이다.
Future
의 블록킹 메소드인 get
을 호출해서 결과를 얻어올 수 있다.
ExecutorService를 사용해서 run을 구현해보자.
typealias Par<ITEM> = (ExecutorService) -> Future<ITEM>
fun <ITEM> run(es: ExecutorService, item: Par<ITEM>): Future<ITEM>
= item(es)
Par
타입을 새로 정의하고 ExecutorService
를 전달하도록 했다.
ExcutorSevice
를 전달해야만 Future
가 생긴다.
이제 우리가 만든 API를 탐구하고 다듬어보자.
object Pars {
fun <ITEM> unit(item: ITEM): Par<ITEM> = { es -> UnitFuture(item) }
data class UnitFuture<ITEM>(val item: ITEM) : Future<ITEM> {
override fun get(): ITEM = item
override fun get(timeout: Long, timeUnit: TimeUnit): ITEM = item
override fun cancel(evenIfRunning: Boolean): Boolean = false
override fun isDone(): Boolean = true
override fun isCancelled(): Boolean = false
}
fun <ITEM1, ITEM2, RESULT> map2(
item1: Par<ITEM1>,
item2: Par<ITEM2>,
block: (ITEM1, ITEM2) -> RESULT
): Par<RESULT> = { es ->
val future1 = item1(es)
val future2 = item2(es)
UnitFuture(block(future1.get(), future2.get()))
}
fun <ITEM> fork(
item: () -> Par<ITEM>
): Par<ITEM> = { es: ExecutorService ->
es.submit(object : Callable<ITEM> {
override fun call(): ITEM = item()(es).get()
})
}
}
이렇게 Future
의 구현체로 UnitFuture
를 만들고 map2
와 fork
를 새로운 자료형을 통해 정의했다.
여기서 map2
는 block()
의 호출을 별도의 스레드에서 하지 않는다.
병렬성을 제어하려면 fork
를 사용해야한다.
block()
의 평가를 별도의 스레드에서 수행하고 싶다면 fork
로 감싸주자.
Pars.fork {
Pars.map2(Pars.unit(1), Pars.unit(2)) { item1, item2 ->
item1 + item2
}
}
문제점
map2
구현은 타임아웃을 준수하지 않는다.
fork
구현에서 Callable
은 내부 작업이 완료될 때 까지 블록된다.
Future
에는 순수함수형 인터체이스가 없다.
- 고로 사용자가
Future
를 직접 다루지 못하게 한다.
- 하지만,
Future
에 부수효과가 있더라도, 우리가 만든 Par
API는 순수함수형으로 남는다. ??
연습문제 7.3
Future의 타임아웃을 준수하도록 map2 구현을 수정하라
fun <ITEM1, ITEM2, RESULT> map2_timeout(
item1: Par<ITEM1>,
item2: Par<ITEM2>,
block: (ITEM1, ITEM2) -> RESULT
): Par<RESULT> = { es ->
val future1 = item1(es)
val future2 = item2(es)
TimedMap2Future(future1, future2, block)
}
data class TimedMap2Future<ITEM1, ITEM2, RESULT>(
val item1: Future,
val item2: Future,
val block: (ITEM1, ITEM2) -> RESULT
) : Future {
override fun get(): RESULT {
TODO("Not yet implemented")
}
override fun get(timeout: Long, timeUnit: TimeUnit): RESULT {
val timeoutMillis = TimeUnit.MICROSECONDS.convert(timeout, timeUnit)
val start = System.currentTimeMillis()
val a = item1.get(timeout, timeUnit)
val duration = System.currentTimeMillis() - start
val remainder = timeoutMillis - duration
val b = item2.get(remainder, TimeUnit.MICROSECONDS)
return block(a, b)
}
override fun cancel(evenIfRunning: Boolean): Boolean {
TODO("Not yet implemented")
}
override fun isDone(): Boolean {
TODO("Not yet implemented")
}
override fun isCancelled(): Boolean {
TODO("Not yet implemented")
}
}
- **연습문제 7.4**
- lazyUnit을 사용해 (A) -> B 타입의 임의의 함수를 비동기적으로 결과를 평가하는 함수로 변환하는 asyncF를 작성하라.
```kotlin
fun <ITEM, RESULT> asyncF(f: (ITEM) -> RESULT): (ITEM) -> Par<RESULT> =
{ item ->
lazyUnit { f(item) }
}
기존 콤비네이터로 더 구현해보자.
sortPar
Par.run()
해서 평가하고, 결과 리스트를 받아 정렬한 다음 unit
을 이용해 다시 감싸기
map2
를 이용해 내부 리스트에 접근하기
fun sortPar(parList: Par<List<Int>>): Par<List<Int>> =
map2(parList, unit(Unit)) { list, _ -> list.sorted() }
map2를 통해서만 내부의 값을 바꿀 수 있다.
- 그러니 우선 map2를 써야겠다.
- 하나의 인자만 필요하니 다른 건 빈값을 넣자.
이제는 Par<A>
타입을 받아 Par<B>
를 반환하는 map
연산을 만들 수 있다.
fun <ITEM, RESULT> map(pa: Par<ITEM>, block: (ITEM) -> RESULT): Par<RESULT> = map2(pa, unit(Unit)) { a, _ -> block(a) }
이제 다시, map
으로 sortPar
를 구현해보자.
fun sortPar2(parList: Par<List<Int>>): Par<List<Int>> = map(parList) { it.sorted() }
map2
로 map
을 구현하였다.
리스트에 대해 병렬로 map
을 수행해보자.
fun <ITEM, RESULT> parMap(
list: List<ITEM>,
block: (ITEM) -> RESULT
): Par<List<RESULT>> {
val fbs: List<Par<RESULT>> = list.map(asyncF(block))
TODO()
}
val List.tail: List
get() = this.drop(1)
val Nil = listOf()
fun sequence1(ps: List): Par =
when (ps) {
Nil -> unit(Nil)
else -> map2(
ps.head,
sequence1(ps.tail)
) { a: ITEM, b: List ->
listOf(a) + b
}
}
fun sequence(ps: List): Par =
when {
ps.isEmpty() -> unit(Nil)
ps.size == 1 -> map(ps.head) { listOf(it) }
else -> {
val l = ps.subList(0, ps.size / 2)
val r = ps.subList(ps.size / 2, ps.size)
map2(sequence(l), sequence(r)) { la, lb ->
la + lb
}
}
}
- 이렇게 2가지 버전으로 기본연산 `sequence`를 만들었다.
- 이제 `parMap`을 구현할 수 있다.
```kotlin
fun <ITEM, RESULT> parMap(
list: List<ITEM>,
block: (ITEM) -> RESULT
): Par<List<RESULT>> {
val fbs: List<Par<RESULT>> = list.map(asyncF(block))
return sequence1(fbs)
}
연습문제 7.6
리스트의 원소를 병렬로 걸러내는 parFilter 구현
fun <ITEM> parFilter(
list: List<ITEM>,
block: (ITEM) -> Boolean
): Par<List<ITEM>> {
val pars: List<Par<ITEM>> = list.map { lazyUnit { it } }
return map(sequence(pars)) { la ->
la.flatMap { a ->
if (block(a)) listOf(a) else emptyList()
}
}
}
정의한 API의 법칙을 형식화해보자.
- 매핑(mapping)
- 논리 스레드 분기(forking)
두 가지에 대해 알아보자.
매핑 법칙
map(unit(1)) { it + 1 } == unit(2)
이 식은 unit(1)
을 { i + 1 }
로 매핑한 것이 unit(2)
와 같음을 뜻한다.
Par
객체가 동치라는 것은 모든 올바른 ExecutorService
에 대해 두 Par
객체가 내놓는Future
의 결과가 같다는 뜻이다.
이 함수를 일반화 해보자.
map(unit(x), f) == unit(f(x))
이 법칙은 우리에게 몇가지 제약을 준다.
- unit 구현은 자신이 받는 값을 들여다 볼 수 없다. 그저 전달할 뿐이다.
ExecutorService
에 Callable
을 전달할 때도 비슷했다. Callable
객체에 대한 어떤 가정도 할 수 없고 Callable
에 따라 어떤 분기도 할 수 없다. 그저 실행할 뿐이다.
- map과 unit의 구현에서 다운캐스팅을 하거나 타입캐스팅을 금지한다.
앞서 본 규칙을 더 단순화해보자.
- 어떤 x와 f를 사용하든 법칙이 성립해야한다.
- f가 항등함수라면 어떨까?
fun <A> id(a: A):A = a
val x = 1
val y = unit(x)
val f = { a: Int -> a + 1 }
val id = { a: Int -> a }
// 초기 법칙
map(unit(x), f) == unit(f(x))
// f를 id로 바꿔보자.
map(unit(x), id) == unit(id(x))
map(unit(x), id) == unit(x) // id(x) == x
map(y, id) == y // unit(x) == y
- unit에 대한 언급 없이 map에 대해서만 성립하는 법칙을 만들었다.
- map이 할 수 있는 일은 f의 결과를 y에 적용시키는 것이다.
- map은 함수를 적용시키기 전에 예외를 던지거나 비정상 종료를 할 수 없다.
- f가 항등함수인 id라면 y는 영향받지 않고 반환돼야한다.
- 이를
map이 구조보존적이어야 한다
라고 말한다.
논리 스레드 분기의 법칙
fork
는 병렬 계산의 결과에 영향을 끼치면 안된다. 라는 법칙이 있다.
fork { x } = x
fork
는 주 스레드와 분기된 별도의 논리적 스레드에서 비동기적으로 처리된다는 점을 제외하면 x와 동일하게 일을 해야한다.
- 이 단순한 속성은
fork
구현을 강하게 제약한다.
- 이렇게 법칙을 만들었으면 법칙을 깨보려고 시도해봐야한다.
두 Par 인스턴스가 동등한지 검사하는 단언문 함수를 작성해보자.
infix fun <A> Par<A>.shouldBe(other: Par<A>) = { es ->
if (this(es).get() != other(es).get())
throw AssertionError("Par instance not equal")
}
- 이 단언문 메소드를 사용하면 fork 구현에서 발생하는 미묘한 문제를 찾아낼 수 있다.
- 스레드 풀이 고정된
ExecutorService
를 쓰면 교착 상태에 빠지기 쉽다.
val es = Executors.newFixedThreadPool(1)
val a: Par<Int> = lazyUnit { 42 + 1 }
val b: Par<Int> = fork { a }
(a shouldBe b)(es)
위 코드에서는 교착 상태가 발생한다.
왜그럴까?
fork 안에서 Callable을 받는다.
fork { a }
는 fork { lazyUnit { 42 + 1 } }
이고, 이는 fork { fork { unit() } }
이다.
- fork 안에서 fork를 호출하고 있는것이다.
- 그러다 보니 받은
ExecutorService
에 Callable
을 전달해서 평가하는데 그 안에서 또 ExecutorService
에 Callable
을 제출하는 것이다.
- 근데 스레드 풀은 1개니까 교착상태가 발생한다.
이렇게 fork
구현에 대한 반례를 찾았다.
- 해결 방안
- 법칙이 성립하도록 구현을 수정
- 법칙이 언제 성립하는지를 더 명확히 기술하도록 법칙을 세밀화
- e.g. 스레드풀은 제한없이 커질 수 있다.
- 이런 법칙들은 문서화해야함
fork를 수정해서 고정 크기의 스레드 풀에서도 정상동작하게 할 수 있을지 보자.
fun <A> fork(pa: () -> Par<A>): Par<A> = { es -> pa()(es) }
pa를 바로 평가했다.
- 이는 교착 상태를 방지해준다.
- 하지만 논리적 스레드를 분기하지 않았다.
- fork의 의도와 다르다.
- 하지만, 필요할 때까지 계산을 인스턴스화 하는 것을 지연시키므로, 유용하다.
delay라는 이름으로 바꾸겠다.
fun <A> delay(pa: () -> Par<A>): Par<A> = { es -> pa()(es) }
콤비네이터를 일반적인 형태로 세분화 해보자.
- API를 구현하며 새로운 콤비네이터가 필요할 때가 있다.
- 새 콤비네이터를 만들기 전에 필요한 콤비네이터를 가장 일반적인 형태로 세분화할 수 있는지 생각해봐야한다.
- 필요한 콤비네이터가 더 일반적인 콤비네이터의 특별한 경우일 수 있기 떄문이다.
세분화하는 과정의 예시를 살펴보자.
fun <ITEM> choice(
condition: Par<Boolean>,
t: Par<ITEM>,
f: Par<ITEM>
): Par<ITEM>
- condition의 결과에 따라 t, f를 실행하고 싶다.
- condition의 평가를 블록한 상태로 기다리고 그 결과를 통해 어느 한쪽을 실행하도록 처리하면 된다.
fun <ITEM> choice(
condition: Par<Boolean>,
t: Par<ITEM>,
f: Par<ITEM>
): Par<ITEM> = { es: ExecutorService ->
when (run(es, condition).get()) {
true -> run(es, t)
false -> run(es, f)
}
}
의심해보자.
의심해보자.
의심해보자.
fun choice_2(
cond: Par,
t: Par,
f: Par
): Par = chooser(cond) {
when (it) {
true -> t
false -> f
}
}
fun choiceN_2(
cond: Par,
choices: List
): Par = chooser(cond) {
choices[it]
}
fun <K, V> choiceMap_2(
cond: Par,
choices: Map<K, Par>
): Par = chooser(cond) {
choices.getValue(it)
}
- 시그니처를 자세히 보자.
- `Par<A>`를 받아서 `Par<B>`를 반환한다.
- `flatMap`의 시그니처와 동일하다!
그렇다.
`choice`의 가장 기본연산은 `flatMap`이 었던 것이다.
###### 의심해보자.
- 과연 `flatMap`은 기본연산일까. 더 일반화 하지는 못할까?
- `flatMap`은 두 단계로 세분화 할 수 있다.
- Mapping
- Flatten
- Flatten 연산을 `join`이라고 하자.
- `join`은 모든 X타입에 대해 `Par<Par<X>>`를 `Par<X>`로 변환한다.
- 개념적으로 보면, `run`을 호출하면 내부 계산을 실행한 후 내부 계산의 완료를 기다렸다가 결과를 돌려주는 병렬계산이라고 할 수 있다.
- 연습문제 7.13
- join을 구현하라.
- join을 써서 flatMap을 구현하라.
- flatMap을 써서 join을 구현하라.
```kotlin
fun <ITEM> join(item: Par<Par<ITEM>>): Par<ITEM>
= { es: ExecutorService ->
run(es, run(es, item).get())
}
fun <ITEM, RESULT> flatMap(
item: Par<ITEM>,
block: (ITEM) -> Par<RESULT>
): Par<RESULT> = join(map(item, block))