병렬처리하는 법을 알아보자.
예를 들어보자.
[1, 2, 3, 4, 5 ,6, 7, 3, 32, 12, 23, 18, 10] 리스트의 합계를 구해보자.
fold를 사용해 계산한다고 하면, 이렇게 될 것이다.
1 + 2 + ...
3 + 4 + ...
7 + 5 + ...
12 + 6 + ...
18 + 7 + ...
25 + 3 + ...
28 + 32 + ...
60 + 12 + ...
72 + 23 + ...
95 + 18 + ...
113 + 10
123
총 시간은 O(n)만큼 걸린다.
분할 정복을 사용해보자.
[1, 2, 3, 4, 5 ,6, 7,]
[3, 32, 12, 23, 18, 10]
두 리스트의 합으로 볼 수 있다.
더 잘게 쪼개면
[1, 2, 3, 4]
[5 ,6, 7,]
[3, 32, 12]
[23, 18, 10]
이렇게 계속 쪼갤 수 있다.
쪼갠 각 리스트의 합을 모두 더하면 전체 합을 구할 수 있죠.
자 그럼 이제 병렬 처리할 단위를 구할 수 있습니다.
쪼갠 리스트를 각 스레드에 할당해서 계산하면 되겠죠~
사실 이번 장에서는 병렬처리를 어떻게 구현하느냐는 관심없스요.
그냥 병렬처리를 사용하는 함수형 API를 만드는 것이 목적인 것이요.
위에서의 예제를 코드로 봐볼게요.
fun sum(ints: List<Int>): Int =
if (ints.size <= 1) {
ints.firstOrNull() ?: 0
} else {
val l = ints.subList(0, ints.size / 2)
val r = ints.subList(ints.size / 2, ints.size)
sum(l) + sum(r)
}sum(l) + sum(r) 에서 각 계산을 병렬로 처리해야하죠! 근데 지금은 sum(l)만 주구장창 계산하다. sum(r)을 계산하기 시작할거에요.
그래서 이건 병렬 계산이에요~ 라는 걸 알려줄 자료형을 정해봐요.
Parallel을 줄여서 Par 이라고 할게요.
class Par<ITEM>(val get: ITEM)
fun <ITEM> unit(item: () -> ITEM): Par<ITEM> = Par(item())
fun <ITEM> get(item: Par<ITEM>): ITEM = item.get 병렬계산을 하고 계산 결과를 item에 담는 자료형을 만들었습니다.
예제에 Par을 써보죠
fun sum2(ints: List<Int>): Int =
if (ints.size <= 1) {
ints.firstOrNull() ?: 0
} else {
val l = ints.subList(0, ints.size / 2)
val r = ints.subList(ints.size / 2, ints.size)
val sumL: Par<Int> = unit { sum2(l) }
val sumR: Par<Int> = unit { sum2(r) }
sumL.get + sumR.get
}sumL.get + sumR.get 가 호출되기 전까지는 sum(l) , sum(r) 가 평가되지 않는다.
하지만 get을 호출하면 sum(l)이 모두 평가되고 난 후 sum(r)이 순차적으로 평가 될 것이다.
병렬처리를 하고싶어 만들었지만 실제로는 순차처리되는 것이다..
위 코드에서 sumL과 sumR을 치환해보면 참조투명성을 깨는 것을 볼 수 있다.
unit { sum(l) }.get + unit { sum(r) }.get결과는 똑같겠지만, unit { sum(l) } 이 모두 평가되어야 unit { sum(r) }이 초기화된다.
위에서 본 것과는 차이가 있다.
다만 get을 사용하지 않는다면 이런 참조 투명성을 해치는 부수효과가 없다.
이 부수효과를 해결하기 위해서 get 호출을 최대한 미뤄야한다.
어떻게 해야할까.
일단 get 호출을 하지 않는다는 것을 전제로하자.
그럼 sum 함수는 Par<Int> 를 반환하는 게 맞다.
fun sum3(ints: List<Int>): Par<Int> =
if (ints.size <= 1) {
unit { ints.firstOrNull() ?: 0 }
} else {
val l = ints.subList(0, ints.size / 2)
val r = ints.subList(ints.size / 2, ints.size)
map2(sum3(l), sum3(r)) { lx: Int, rx: Int -> lx + rx }
}map2 함수를 만들어서 구현했다.
fun <ITEM1, ITEM2, RESULT> map2(item1: Par<ITEM1>, item2: Par<ITEM2>, block: (ITEM1, ITEM2) -> RESULT): Par<RESULT> =
unit { block(item1.get, item2.get) }이렇게 정의할 수 있다.
재귀적인 경우 더이상 unit을 호출하지 않는다.
자 그럼 이제 논의할 것은 map2의 인자를 지연 계산으로 받을 것이냐이다.
map2가 엄격한 인자를 사용할 때 문제점은 다음과 같다.
sum(listOf(1,2,3,4))
map2(
sum(listOf(1, 2)),
sum(listOf(3, 4))
)
map2(
map2(
listOf(1),
listOf(2)
),
sum(listOf(3, 4))
)
map2(
map2(
listOf(1),
listOf(2)
),
map2(
listOf(3),
listOf(4)
),
)이렇게 왼쪽을 완전히 계산한 다음 오른쪽을 계산하게 된다.
두 파라미터를 동시에 병렬 계산하지 않는 것이다.
두 파라미터를 동시에 병렬계산 하도록 하려면 map2를 지연계산으로 만들어야한다.
이렇게 결정하고
다음으로 볼 것은 map2는 항상 지연계산일 필요가 있냐는 것이다.
map2(
unit { 1 },
unit { 2 }
)이 경우에는 굳이 병렬로 할 필요가 없다. 오히려 성능이 안좋아진다.
스레드 분기를 명확하게 만들어주자.
다른 스레드에서 실행돼요~ 라는 의미를 가지는 fork 함수를 만들겠어요.
fun sum4(ints: List<Int>): Par<Int> =
if (ints.size <= 1) {
lazyUnit { ints.firstOrNull() ?: 0 }
} else {
val l = ints.subList(0, ints.size / 2)
val r = ints.subList(ints.size / 2, ints.size)
map2(fork { sum4(l) }, fork { sum4(r) }) { lx: Int, rx: Int ->
lx + rx }
}그렇다면 인자가 적으면 별도의 스레드를 사용하지 않겠다. 는 어떻게 할 수 있을까.
fun sum4(ints: List<Int>): Par<Int> =
if (ints.size <= 1) {
lazyUnit { ints.firstOrNull() ?: 0 }
} else {
val l = ints.subList(0, ints.size / 2)
val r = ints.subList(ints.size / 2, ints.size)
if (l.size <= 2) {
map2(sum4(l), fork { sum4(r) }) { lx: Int, rx: Int ->
lx + rx }
} else {
map2(fork { sum4(l) }, fork { sum4(r) }) { lx: Int, rx: Int ->
lx + rx }
}
}이렇게하는 걸까.
자. 어쨌든 fork를 써서 "별도의 스레드에서 실행하겠어"라는 것을 만들어줬다.
그럼 unit은 지연 계산할 필요가 있을까?
fun <ITEM> unit(item: () -> ITEM): Par<ITEM> = Par(item())unit 은 이렇게 만들었다.
앞에서 fork를 만들었으니, unit을 지연 계산하려면, fork를 사용하는 방법을 생각해 볼 수 있다.
그러면 엄격한 unit과 엄격하지 않은 unit을 만들 수 있다.
fun <ITEM> unit(item: ITEM): Par<ITEM> = Par(item)
fun <ITEM> lazyUnit(item: () -> ITEM): Par<ITEM> =
fork { unit(item()) }unit같은 기본형 콤비네이터로, lazyUnit같은 파생형 콤비네이터를 만들어냈다.
스레드 분기는 언제해야할까?
- fork를 호출했을 때 해야할까?
- get으로 평가를 했을 때 스레드를 분기해야 할까?
- fork에 스레드를 분기 책임이 있는 경우
- fork의 구현은 스레드를 생성하고, 작업을 스레드 풀에 제출하는 방법을 알아야함
- => 스레드풀 같은 곳에 전역적으로 접근이 가능해야함
- => fork를 호출하는 시점에 스레드풀 같은 자원이 초기화 되어있어야함
- 이는 프로그램에 여러 부분에서 원하는 대로 병렬성 전략을 제어할 수 있는 능력을 잃는다는 것
- *get이 스레드 생성과 실행할 작업을 스레드 풀에 제출하는 책임을 가지는 게 더 적합하다.
- ??
이제 fork를 통해서 병렬계산을하라고 '표시' 할 수 있고, Par는 해당 계산이 병렬 계산이라는 것을 알려주는 자료형이다.
사실상 fork와 Par는 병렬처리의 내부구현에 대해 알 필요가 없다.
Par는 그냥 값을 담는 컨테이너로만 생각하면 된다.
fun <ITEM> run(item: Par<ITEM>): ITEM = TODO()run이란 함수를 만들어서 run이 스레드를 시작하거나, 스레드 풀에 제출하거나 하는 등의 방법을 통해 병렬성을 구현하게 한다.
병렬 처리는 어떻게 해야할까?
java.util.concurrent.ExecutorService 클래스를 사용하자.
interface Callable<ITEM> {
fun call(): ITEM
}
interface Future<ITEM> {
fun get(): ITEM
fun get(timeout: Long, timeUnit: TimeUnit): ITEM
fun cancel(evenIfRunning: Boolean): Boolean
fun isDone(): Boolean
fun isCancelled(): Boolean
}
interface ExecutorService {
fun <ITEM> submit(c: Callable<ITEM>): Future<ITEM>
}대충 코틀린으로 찌끄려보면 이렇다.
ExecutorService에는 Callable을 제출할 수 있다.Future는 새로운 드레드에서 실행될 수도 있는 계산을 가리키는 핸들이다.Future의 블록킹 메소드인 get을 호출해서 결과를 얻어올 수 있다.
ExecutorService를 사용해서 run을 구현해보자.
typealias Par<ITEM> = (ExecutorService) -> Future<ITEM>
fun <ITEM> run(es: ExecutorService, item: Par<ITEM>): Future<ITEM>
= item(es)Par 타입을 새로 정의하고 ExecutorService를 전달하도록 했다.ExcutorSevice를 전달해야만 Future가 생긴다.
이제 우리가 만든 API를 탐구하고 다듬어보자.
object Pars {
fun <ITEM> unit(item: ITEM): Par<ITEM> = { es -> UnitFuture(item) }
data class UnitFuture<ITEM>(val item: ITEM) : Future<ITEM> {
override fun get(): ITEM = item
override fun get(timeout: Long, timeUnit: TimeUnit): ITEM = item
override fun cancel(evenIfRunning: Boolean): Boolean = false
override fun isDone(): Boolean = true
override fun isCancelled(): Boolean = false
}
fun <ITEM1, ITEM2, RESULT> map2(
item1: Par<ITEM1>,
item2: Par<ITEM2>,
block: (ITEM1, ITEM2) -> RESULT
): Par<RESULT> = { es ->
val future1 = item1(es)
val future2 = item2(es)
UnitFuture(block(future1.get(), future2.get()))
}
fun <ITEM> fork(
item: () -> Par<ITEM>
): Par<ITEM> = { es: ExecutorService ->
es.submit(object : Callable<ITEM> {
override fun call(): ITEM = item()(es).get()
})
}
}이렇게 Future의 구현체로 UnitFuture를 만들고 map2와 fork를 새로운 자료형을 통해 정의했다.
여기서 map2는 block()의 호출을 별도의 스레드에서 하지 않는다.
병렬성을 제어하려면 fork를 사용해야한다.block()의 평가를 별도의 스레드에서 수행하고 싶다면 fork로 감싸주자.
Pars.fork {
Pars.map2(Pars.unit(1), Pars.unit(2)) { item1, item2 ->
item1 + item2
}
}문제점
map2구현은 타임아웃을 준수하지 않는다.fork구현에서Callable은 내부 작업이 완료될 때 까지 블록된다.Future에는 순수함수형 인터체이스가 없다.- 고로 사용자가
Future를 직접 다루지 못하게 한다. - 하지만,
Future에 부수효과가 있더라도, 우리가 만든ParAPI는 순수함수형으로 남는다.??
- 고로 사용자가
연습문제 7.3
Future의 타임아웃을 준수하도록 map2 구현을 수정하라
fun <ITEM1, ITEM2, RESULT> map2_timeout( item1: Par<ITEM1>, item2: Par<ITEM2>, block: (ITEM1, ITEM2) -> RESULT ): Par<RESULT> = { es -> val future1 = item1(es) val future2 = item2(es) TimedMap2Future(future1, future2, block) }
data class TimedMap2Future<ITEM1, ITEM2, RESULT>(
val item1: Future,
val item2: Future,
val block: (ITEM1, ITEM2) -> RESULT
) : Future {
override fun get(): RESULT {
TODO("Not yet implemented")
}
override fun get(timeout: Long, timeUnit: TimeUnit): RESULT {
val timeoutMillis = TimeUnit.MICROSECONDS.convert(timeout, timeUnit)
val start = System.currentTimeMillis()
val a = item1.get(timeout, timeUnit)
val duration = System.currentTimeMillis() - start
val remainder = timeoutMillis - duration
val b = item2.get(remainder, TimeUnit.MICROSECONDS)
return block(a, b)
}
override fun cancel(evenIfRunning: Boolean): Boolean {
TODO("Not yet implemented")
}
override fun isDone(): Boolean {
TODO("Not yet implemented")
}
override fun isCancelled(): Boolean {
TODO("Not yet implemented")
} }
- **연습문제 7.4**
- lazyUnit을 사용해 (A) -> B 타입의 임의의 함수를 비동기적으로 결과를 평가하는 함수로 변환하는 asyncF를 작성하라.
```kotlin
fun <ITEM, RESULT> asyncF(f: (ITEM) -> RESULT): (ITEM) -> Par<RESULT> =
{ item ->
lazyUnit { f(item) }
}기존 콤비네이터로 더 구현해보자.
sortPar
Par<List<Int>>를 정렬해보자.
Par.run()해서 평가하고, 결과 리스트를 받아 정렬한 다음unit을 이용해 다시 감싸기map2를 이용해 내부 리스트에 접근하기
fun sortPar(parList: Par<List<Int>>): Par<List<Int>> =
map2(parList, unit(Unit)) { list, _ -> list.sorted() }map2를 통해서만 내부의 값을 바꿀 수 있다.
- 그러니 우선 map2를 써야겠다.
- 하나의 인자만 필요하니 다른 건 빈값을 넣자.
이제는
Par<A>타입을 받아Par<B>를 반환하는map연산을 만들 수 있다.fun <ITEM, RESULT> map(pa: Par<ITEM>, block: (ITEM) -> RESULT): Par<RESULT> = map2(pa, unit(Unit)) { a, _ -> block(a) }이제 다시,
map으로sortPar를 구현해보자.fun sortPar2(parList: Par<List<Int>>): Par<List<Int>> = map(parList) { it.sorted() }map2로map을 구현하였다.이렇다는 것은,
map2가 더 강력한 기본 요소인 것이다.parMap
리스트에 대해 병렬로
map을 수행해보자.
fun <ITEM, RESULT> parMap(
list: List<ITEM>,
block: (ITEM) -> RESULT
): Par<List<RESULT>> {
val fbs: List<Par<RESULT>> = list.map(asyncF(block))
TODO()
}우선
List<ITEM>을List<Par<RESULT>타입으로 바꿔주었다.그 다음은,
List<Par<RESULT>>를Par<List<RESULT>>로 바꿔주면 된다.- 이 연산은 무엇일까?
sequence이다.
- 이 연산은 무엇일까?
연습문제 7.5
run을 호출하지 않고sequence를 작성하여라val <T> List<T>.head: T get() = first()
val List.tail: List
get() = this.drop(1)
val Nil = listOf()
fun sequence1(ps: List =
when (ps) {
Nil -> unit(Nil)
else -> map2(
ps.head,
sequence1(ps.tail)
) { a: ITEM, b: List ->
listOf(a) + b
}
}
fun sequence(ps: List =
when {
ps.isEmpty() -> unit(Nil)
ps.size == 1 -> map(ps.head) { listOf(it) }
else -> {
val l = ps.subList(0, ps.size / 2)
val r = ps.subList(ps.size / 2, ps.size)
map2(sequence(l), sequence(r)) { la, lb ->
la + lb
}
}
}
- 이렇게 2가지 버전으로 기본연산 `sequence`를 만들었다.
- 이제 `parMap`을 구현할 수 있다.
```kotlin
fun <ITEM, RESULT> parMap(
list: List<ITEM>,
block: (ITEM) -> RESULT
): Par<List<RESULT>> {
val fbs: List<Par<RESULT>> = list.map(asyncF(block))
return sequence1(fbs)
}연습문제 7.6
리스트의 원소를 병렬로 걸러내는 parFilter 구현
fun <ITEM> parFilter( list: List<ITEM>, block: (ITEM) -> Boolean ): Par<List<ITEM>> { val pars: List<Par<ITEM>> = list.map { lazyUnit { it } } return map(sequence(pars)) { la -> la.flatMap { a -> if (block(a)) listOf(a) else emptyList() } } }
정의한 API의 법칙을 형식화해보자.
- 매핑(mapping)
- 논리 스레드 분기(forking)
두 가지에 대해 알아보자.
매핑 법칙
map(unit(1)) { it + 1 } == unit(2)이 식은 unit(1)을 { i + 1 }로 매핑한 것이 unit(2)와 같음을 뜻한다.Par객체가 동치라는 것은 모든 올바른 ExecutorService에 대해 두 Par 객체가 내놓는Future의 결과가 같다는 뜻이다.
이 함수를 일반화 해보자.
map(unit(x), f) == unit(f(x))이 법칙은 우리에게 몇가지 제약을 준다.
- unit 구현은 자신이 받는 값을 들여다 볼 수 없다. 그저 전달할 뿐이다.
ExecutorService에Callable을 전달할 때도 비슷했다.Callable객체에 대한 어떤 가정도 할 수 없고Callable에 따라 어떤 분기도 할 수 없다. 그저 실행할 뿐이다.
- map과 unit의 구현에서 다운캐스팅을 하거나 타입캐스팅을 금지한다.
앞서 본 규칙을 더 단순화해보자.
- 어떤 x와 f를 사용하든 법칙이 성립해야한다.
- f가 항등함수라면 어떨까?
fun <A> id(a: A):A = a
val x = 1
val y = unit(x)
val f = { a: Int -> a + 1 }
val id = { a: Int -> a }
// 초기 법칙
map(unit(x), f) == unit(f(x))
// f를 id로 바꿔보자.
map(unit(x), id) == unit(id(x))
map(unit(x), id) == unit(x) // id(x) == x
map(y, id) == y // unit(x) == y- unit에 대한 언급 없이 map에 대해서만 성립하는 법칙을 만들었다.
- map이 할 수 있는 일은 f의 결과를 y에 적용시키는 것이다.
- map은 함수를 적용시키기 전에 예외를 던지거나 비정상 종료를 할 수 없다.
- f가 항등함수인 id라면 y는 영향받지 않고 반환돼야한다.
- 이를
map이 구조보존적이어야 한다라고 말한다.
- 이를
논리 스레드 분기의 법칙
fork는 병렬 계산의 결과에 영향을 끼치면 안된다. 라는 법칙이 있다.
fork { x } = xfork는 주 스레드와 분기된 별도의 논리적 스레드에서 비동기적으로 처리된다는 점을 제외하면 x와 동일하게 일을 해야한다.- 이 단순한 속성은
fork구현을 강하게 제약한다.- 이렇게 법칙을 만들었으면 법칙을 깨보려고 시도해봐야한다.
두 Par 인스턴스가 동등한지 검사하는 단언문 함수를 작성해보자.
infix fun <A> Par<A>.shouldBe(other: Par<A>) = { es ->
if (this(es).get() != other(es).get())
throw AssertionError("Par instance not equal")
}- 이 단언문 메소드를 사용하면 fork 구현에서 발생하는 미묘한 문제를 찾아낼 수 있다.
- 스레드 풀이 고정된
ExecutorService를 쓰면 교착 상태에 빠지기 쉽다.
- 스레드 풀이 고정된
val es = Executors.newFixedThreadPool(1)
val a: Par<Int> = lazyUnit { 42 + 1 }
val b: Par<Int> = fork { a }
(a shouldBe b)(es)위 코드에서는 교착 상태가 발생한다.
왜그럴까?
fork의 구현을 보자.
fun <ITEM> fork( item: () -> Par<ITEM> ): Par<ITEM> = { es: ExecutorService -> es.submit(object : Callable<ITEM> { override fun call(): ITEM = item()(es).get() }) }
fork 안에서 Callable을 받는다.
fork { a }는fork { lazyUnit { 42 + 1 } }이고, 이는fork { fork { unit() } }이다.- fork 안에서 fork를 호출하고 있는것이다.
- 그러다 보니 받은
ExecutorService에Callable을 전달해서 평가하는데 그 안에서 또ExecutorService에Callable을 제출하는 것이다. - 근데 스레드 풀은 1개니까 교착상태가 발생한다.
이렇게
fork구현에 대한 반례를 찾았다.- 해결 방안
- 법칙이 성립하도록 구현을 수정
- 법칙이 언제 성립하는지를 더 명확히 기술하도록 법칙을 세밀화
- e.g. 스레드풀은 제한없이 커질 수 있다.
- 이런 법칙들은 문서화해야함
- 해결 방안
fork를 수정해서 고정 크기의 스레드 풀에서도 정상동작하게 할 수 있을지 보자.
fun <A> fork(pa: () -> Par<A>): Par<A> = { es -> pa()(es) }pa를 바로 평가했다.
- 이는 교착 상태를 방지해준다.
- 하지만 논리적 스레드를 분기하지 않았다.
- fork의 의도와 다르다.
- 하지만, 필요할 때까지 계산을 인스턴스화 하는 것을 지연시키므로, 유용하다.
delay라는 이름으로 바꾸겠다.
fun <A> delay(pa: () -> Par<A>): Par<A> = { es -> pa()(es) }
콤비네이터를 일반적인 형태로 세분화 해보자.
- API를 구현하며 새로운 콤비네이터가 필요할 때가 있다.
- 새 콤비네이터를 만들기 전에 필요한 콤비네이터를 가장 일반적인 형태로 세분화할 수 있는지 생각해봐야한다.
- 필요한 콤비네이터가 더 일반적인 콤비네이터의 특별한 경우일 수 있기 떄문이다.
세분화하는 과정의 예시를 살펴보자.
fun <ITEM> choice(
condition: Par<Boolean>,
t: Par<ITEM>,
f: Par<ITEM>
): Par<ITEM>- condition의 결과에 따라 t, f를 실행하고 싶다.
- condition의 평가를 블록한 상태로 기다리고 그 결과를 통해 어느 한쪽을 실행하도록 처리하면 된다.
fun <ITEM> choice(
condition: Par<Boolean>,
t: Par<ITEM>,
f: Par<ITEM>
): Par<ITEM> = { es: ExecutorService ->
when (run(es, condition).get()) {
true -> run(es, t)
false -> run(es, f)
}
}의심해보자.
Boolean을 사용하고 두 개 중에 한개만 택한다는 것은 좀 임의적인 것 같다.
두개여야 할 이유는 무엇일까, N개를 받도록 변경해보자.
연습문제 7.10
choiceN을 구현하라.
choiceN을 이용해 choice를 구현하라.
fun <ITEM> choiceN(n: Par<Int>, choices: List<Par<ITEM>>): Par<ITEM> = { es: ExecutorService -> run(es, choices[run(es, cond).get()]) }
의심해보자.
왜 List일까. Map으로 해서 1:1로 찾을 수 있지 않을까?
연습문제 7.11
choiceMap 콤비네이터를 구현하라.
fun <K, V> choiceMap( key: Par<K>, choices: Map<K, Par<V>> ): Par<V> = { es: ExecutorService -> run(es, choices.getValue(run(es, key).get())) }
의심해보자.
Map도 너무 임의적이지 않나
실제로 여기서 맵은
(K) -> Par<V>를 반환하는 것만 한다.모든 케이스를 아우르는 API를 만들자.
연습문제 7.12
새 기본요소 chooser를 구현하라.
chooser를 사용해서 choice, choiceN, choiceMap을 구현하라.
fun <A, B> chooser(pa: Par<A>, choices: (A) -> Par<B>): Par<B> = { es: ExecutorService -> run(es, choices(run(es, pa).get())) }
fun choice_2(
cond: Par,
t: Par,
f: Par
): Par = chooser(cond) {
when (it) {
true -> t
false -> f
}
}
fun choiceN_2(
cond: Par,
choices: List
): Par = chooser(cond) {
choices[it]
}
fun <K, V> choiceMap_2(
cond: Par,
choices: Map<K, Par>
): Par = chooser(cond) {
choices.getValue(it)
}
- 시그니처를 자세히 보자.
- `Par<A>`를 받아서 `Par<B>`를 반환한다.
- `flatMap`의 시그니처와 동일하다!
그렇다.
`choice`의 가장 기본연산은 `flatMap`이 었던 것이다.
###### 의심해보자.
- 과연 `flatMap`은 기본연산일까. 더 일반화 하지는 못할까?
- `flatMap`은 두 단계로 세분화 할 수 있다.
- Mapping
- Flatten
- Flatten 연산을 `join`이라고 하자.
- `join`은 모든 X타입에 대해 `Par<Par<X>>`를 `Par<X>`로 변환한다.
- 개념적으로 보면, `run`을 호출하면 내부 계산을 실행한 후 내부 계산의 완료를 기다렸다가 결과를 돌려주는 병렬계산이라고 할 수 있다.
- 연습문제 7.13
- join을 구현하라.
- join을 써서 flatMap을 구현하라.
- flatMap을 써서 join을 구현하라.
```kotlin
fun <ITEM> join(item: Par<Par<ITEM>>): Par<ITEM>
= { es: ExecutorService ->
run(es, run(es, item).get())
}
fun <ITEM, RESULT> flatMap(
item: Par<ITEM>,
block: (ITEM) -> Par<RESULT>
): Par<RESULT> = join(map(item, block))'Programming > 코틀린 함수형 프로그래밍' 카테고리의 다른 글
| 6. 순수 함수형 상태 (0) | 2023.10.20 |
|---|---|
| 5. 엄격성과 지연성 (0) | 2023.10.15 |
| 함수형 프로그래밍의 Fold(FoldLeft, FoldRight의 차이점) (1) | 2023.09.18 |
| 4. 예외를 사용하지 않고 오류 다루기 - Functional Programming in Kotlin (0) | 2023.09.10 |
| 3. 함수형 데이터 구조 - Functional Programming in Kotlin (0) | 2023.09.10 |