병렬처리하는 법을 알아보자.

예를 들어보자.
[1, 2, 3, 4, 5 ,6, 7, 3, 32, 12, 23, 18, 10] 리스트의 합계를 구해보자.

fold를 사용해 계산한다고 하면, 이렇게 될 것이다.

1 + 2 + ...
3 + 4 + ...
7 + 5 + ...
12 + 6 + ...
18 + 7 + ...
25 + 3 + ...
28 + 32 + ...
60 + 12 + ...
72 + 23 + ...
95 + 18 + ...
113 + 10
123

총 시간은 O(n)만큼 걸린다.
분할 정복을 사용해보자.

[1, 2, 3, 4, 5 ,6, 7,]
[3, 32, 12, 23, 18, 10]
두 리스트의 합으로 볼 수 있다.
더 잘게 쪼개면

[1, 2, 3, 4]
[5 ,6, 7,]
[3, 32, 12]
[23, 18, 10]
이렇게 계속 쪼갤 수 있다.
쪼갠 각 리스트의 합을 모두 더하면 전체 합을 구할 수 있죠.

자 그럼 이제 병렬 처리할 단위를 구할 수 있습니다.
쪼갠 리스트를 각 스레드에 할당해서 계산하면 되겠죠~

사실 이번 장에서는 병렬처리를 어떻게 구현하느냐는 관심없스요.
그냥 병렬처리를 사용하는 함수형 API를 만드는 것이 목적인 것이요.

위에서의 예제를 코드로 봐볼게요.

fun sum(ints: List<Int>): Int =  
    if (ints.size <= 1) {  
        ints.firstOrNull() ?: 0  
    } else {  
        val l = ints.subList(0, ints.size / 2)  
        val r = ints.subList(ints.size / 2, ints.size)  
        sum(l) + sum(r)  
    }

sum(l) + sum(r) 에서 각 계산을 병렬로 처리해야하죠! 근데 지금은 sum(l)만 주구장창 계산하다. sum(r)을 계산하기 시작할거에요.

그래서 이건 병렬 계산이에요~ 라는 걸 알려줄 자료형을 정해봐요.

Parallel을 줄여서 Par 이라고 할게요.

class Par<ITEM>(val get: ITEM)  

fun <ITEM> unit(item: () -> ITEM): Par<ITEM> = Par(item())

fun <ITEM> get(item: Par<ITEM>): ITEM = item.get  

병렬계산을 하고 계산 결과를 item에 담는 자료형을 만들었습니다.
예제에 Par을 써보죠

fun sum2(ints: List<Int>): Int =  
    if (ints.size <= 1) {  
        ints.firstOrNull() ?: 0  
    } else {  
        val l = ints.subList(0, ints.size / 2)  
        val r = ints.subList(ints.size / 2, ints.size)  
        val sumL: Par<Int> = unit { sum2(l) }  
        val sumR: Par<Int> = unit { sum2(r) }  
        sumL.get + sumR.get  
    }

sumL.get + sumR.get 가 호출되기 전까지는 sum(l) , sum(r) 가 평가되지 않는다.
하지만 get을 호출하면 sum(l)이 모두 평가되고 난 후 sum(r)이 순차적으로 평가 될 것이다.

병렬처리를 하고싶어 만들었지만 실제로는 순차처리되는 것이다..

위 코드에서 sumLsumR을 치환해보면 참조투명성을 깨는 것을 볼 수 있다.

unit { sum(l) }.get + unit { sum(r) }.get

결과는 똑같겠지만, unit { sum(l) } 이 모두 평가되어야 unit { sum(r) }이 초기화된다.
위에서 본 것과는 차이가 있다.
다만 get을 사용하지 않는다면 이런 참조 투명성을 해치는 부수효과가 없다.

이 부수효과를 해결하기 위해서 get 호출을 최대한 미뤄야한다.

어떻게 해야할까.

일단 get 호출을 하지 않는다는 것을 전제로하자.

그럼 sum 함수는 Par<Int> 를 반환하는 게 맞다.

fun sum3(ints: List<Int>): Par<Int> =  
    if (ints.size <= 1) {  
        unit { ints.firstOrNull() ?: 0 }  
    } else {  
        val l = ints.subList(0, ints.size / 2)  
        val r = ints.subList(ints.size / 2, ints.size)  
        map2(sum3(l), sum3(r)) { lx: Int, rx: Int -> lx + rx }  
    }

map2 함수를 만들어서 구현했다.

fun <ITEM1, ITEM2, RESULT> map2(item1: Par<ITEM1>, item2: Par<ITEM2>, block: (ITEM1, ITEM2) -> RESULT): Par<RESULT> =  
    unit { block(item1.get, item2.get) }

이렇게 정의할 수 있다.
재귀적인 경우 더이상 unit을 호출하지 않는다.

자 그럼 이제 논의할 것은 map2의 인자를 지연 계산으로 받을 것이냐이다.

map2가 엄격한 인자를 사용할 때 문제점은 다음과 같다.

sum(listOf(1,2,3,4))

map2(
    sum(listOf(1, 2)),
    sum(listOf(3, 4))
)

map2(
    map2(
        listOf(1),
        listOf(2)
    ),
    sum(listOf(3, 4))
)

map2(
    map2(
        listOf(1),
        listOf(2)
    ),
    map2(
        listOf(3),
        listOf(4)
    ),
)

이렇게 왼쪽을 완전히 계산한 다음 오른쪽을 계산하게 된다.
두 파라미터를 동시에 병렬 계산하지 않는 것이다.

두 파라미터를 동시에 병렬계산 하도록 하려면 map2를 지연계산으로 만들어야한다.
이렇게 결정하고

다음으로 볼 것은 map2는 항상 지연계산일 필요가 있냐는 것이다.
map2(
    unit { 1 },
    unit { 2 }
)

이 경우에는 굳이 병렬로 할 필요가 없다. 오히려 성능이 안좋아진다.
스레드 분기를 명확하게 만들어주자.
다른 스레드에서 실행돼요~ 라는 의미를 가지는 fork 함수를 만들겠어요.

fun sum4(ints: List<Int>): Par<Int> =  
    if (ints.size <= 1) {  
        lazyUnit { ints.firstOrNull() ?: 0 }  
    } else {  
        val l = ints.subList(0, ints.size / 2)  
        val r = ints.subList(ints.size / 2, ints.size)  
        map2(fork { sum4(l) }, fork { sum4(r) }) { lx: Int, rx: Int -> 
            lx + rx }
    }

그렇다면 인자가 적으면 별도의 스레드를 사용하지 않겠다. 는 어떻게 할 수 있을까.

fun sum4(ints: List<Int>): Par<Int> =  
    if (ints.size <= 1) {  
        lazyUnit { ints.firstOrNull() ?: 0 }  
    } else {  
        val l = ints.subList(0, ints.size / 2)  
        val r = ints.subList(ints.size / 2, ints.size)  
        if (l.size <= 2) {
            map2(sum4(l), fork { sum4(r) }) { lx: Int, rx: Int -> 
                lx + rx }
        } else {
            map2(fork { sum4(l) }, fork { sum4(r) }) { lx: Int, rx: Int -> 
                lx + rx }
        }
    }

이렇게하는 걸까.

자. 어쨌든 fork를 써서 "별도의 스레드에서 실행하겠어"라는 것을 만들어줬다.

그럼 unit은 지연 계산할 필요가 있을까?
fun <ITEM> unit(item: () -> ITEM): Par<ITEM> = Par(item())

unit 은 이렇게 만들었다.
앞에서 fork를 만들었으니, unit을 지연 계산하려면, fork를 사용하는 방법을 생각해 볼 수 있다.
그러면 엄격한 unit과 엄격하지 않은 unit을 만들 수 있다.

fun <ITEM> unit(item: ITEM): Par<ITEM> = Par(item)
fun <ITEM> lazyUnit(item: () -> ITEM): Par<ITEM> = 
    fork { unit(item()) }

unit같은 기본형 콤비네이터로, lazyUnit같은 파생형 콤비네이터를 만들어냈다.

스레드 분기는 언제해야할까?
  • fork를 호출했을 때 해야할까?
  • get으로 평가를 했을 때 스레드를 분기해야 할까?
  • fork에 스레드를 분기 책임이 있는 경우
    • fork의 구현은 스레드를 생성하고, 작업을 스레드 풀에 제출하는 방법을 알아야함
    • => 스레드풀 같은 곳에 전역적으로 접근이 가능해야함
    • => fork를 호출하는 시점에 스레드풀 같은 자원이 초기화 되어있어야함
    • 이는 프로그램에 여러 부분에서 원하는 대로 병렬성 전략을 제어할 수 있는 능력을 잃는다는 것
  • *get이 스레드 생성과 실행할 작업을 스레드 풀에 제출하는 책임을 가지는 게 더 적합하다.
    • ??

이제 fork를 통해서 병렬계산을하라고 '표시' 할 수 있고, Par는 해당 계산이 병렬 계산이라는 것을 알려주는 자료형이다.
사실상 fork와 Par는 병렬처리의 내부구현에 대해 알 필요가 없다.
Par는 그냥 값을 담는 컨테이너로만 생각하면 된다.

fun <ITEM> run(item: Par<ITEM>): ITEM = TODO()

run이란 함수를 만들어서 run이 스레드를 시작하거나, 스레드 풀에 제출하거나 하는 등의 방법을 통해 병렬성을 구현하게 한다.

병렬 처리는 어떻게 해야할까?

java.util.concurrent.ExecutorService 클래스를 사용하자.

interface Callable<ITEM> {  
    fun call(): ITEM  
}  

interface Future<ITEM> {  
    fun get(): ITEM  
    fun get(timeout: Long, timeUnit: TimeUnit): ITEM  
    fun cancel(evenIfRunning: Boolean): Boolean  
    fun isDone(): Boolean  
    fun isCancelled(): Boolean  
}  

interface ExecutorService {  
    fun <ITEM> submit(c: Callable<ITEM>): Future<ITEM>  
}

대충 코틀린으로 찌끄려보면 이렇다.

ExecutorService에는 Callable을 제출할 수 있다.
Future는 새로운 드레드에서 실행될 수도 있는 계산을 가리키는 핸들이다.
Future의 블록킹 메소드인 get을 호출해서 결과를 얻어올 수 있다.

ExecutorService를 사용해서 run을 구현해보자.
typealias Par<ITEM> = (ExecutorService) -> Future<ITEM>  
fun <ITEM> run(es: ExecutorService, item: Par<ITEM>): Future<ITEM> 
    = item(es)

Par 타입을 새로 정의하고 ExecutorService를 전달하도록 했다.
ExcutorSevice를 전달해야만 Future가 생긴다.

이제 우리가 만든 API를 탐구하고 다듬어보자.
object Pars {  
    fun <ITEM> unit(item: ITEM): Par<ITEM> = { es -> UnitFuture(item) }  

    data class UnitFuture<ITEM>(val item: ITEM) : Future<ITEM> {  
        override fun get(): ITEM = item  

        override fun get(timeout: Long, timeUnit: TimeUnit): ITEM = item  

        override fun cancel(evenIfRunning: Boolean): Boolean = false  

        override fun isDone(): Boolean = true  

        override fun isCancelled(): Boolean = false  
    }  

    fun <ITEM1, ITEM2, RESULT> map2(  
        item1: Par<ITEM1>,  
        item2: Par<ITEM2>,  
        block: (ITEM1, ITEM2) -> RESULT  
    ): Par<RESULT> = { es ->  
        val future1 = item1(es)  
        val future2 = item2(es)  
        UnitFuture(block(future1.get(), future2.get()))  
    }  

    fun <ITEM> fork(  
        item: () -> Par<ITEM>  
    ): Par<ITEM> = { es: ExecutorService ->  
        es.submit(object : Callable<ITEM> {  
            override fun call(): ITEM = item()(es).get()  
        })  
    }  
}

이렇게 Future의 구현체로 UnitFuture를 만들고 map2fork를 새로운 자료형을 통해 정의했다.

여기서 map2block()의 호출을 별도의 스레드에서 하지 않는다.
병렬성을 제어하려면 fork를 사용해야한다.
block()의 평가를 별도의 스레드에서 수행하고 싶다면 fork로 감싸주자.

Pars.fork {  
    Pars.map2(Pars.unit(1), Pars.unit(2)) { item1, item2 ->  
        item1 + item2  
    }  
}
  • 문제점

    1. map2구현은 타임아웃을 준수하지 않는다.
    2. fork 구현에서 Callable은 내부 작업이 완료될 때 까지 블록된다.
    3. Future에는 순수함수형 인터체이스가 없다.
      • 고로 사용자가 Future를 직접 다루지 못하게 한다.
      • 하지만, Future에 부수효과가 있더라도, 우리가 만든 Par API는 순수함수형으로 남는다. ??
  • 연습문제 7.3

    • Future의 타임아웃을 준수하도록 map2 구현을 수정하라

      fun <ITEM1, ITEM2, RESULT> map2_timeout(  
      item1: Par<ITEM1>,  
      item2: Par<ITEM2>,  
      block: (ITEM1, ITEM2) -> RESULT  
      ): Par<RESULT> = { es ->  
      val future1 = item1(es)  
      val future2 = item2(es)  
      TimedMap2Future(future1, future2, block)   
      }

data class TimedMap2Future<ITEM1, ITEM2, RESULT>(
val item1: Future,
val item2: Future,
val block: (ITEM1, ITEM2) -> RESULT
) : Future {
override fun get(): RESULT {
TODO("Not yet implemented")
}

override fun get(timeout: Long, timeUnit: TimeUnit): RESULT {  
    val timeoutMillis = TimeUnit.MICROSECONDS.convert(timeout, timeUnit)  
    val start = System.currentTimeMillis()  
    val a = item1.get(timeout, timeUnit)  
    val duration = System.currentTimeMillis() - start  
    val remainder = timeoutMillis - duration  
    val b = item2.get(remainder, TimeUnit.MICROSECONDS)  
    return block(a, b)  
}  

override fun cancel(evenIfRunning: Boolean): Boolean {  
    TODO("Not yet implemented")  
}  
override fun isDone(): Boolean {  
    TODO("Not yet implemented")  
}  
override fun isCancelled(): Boolean {  
    TODO("Not yet implemented")  
}  

}


- **연습문제 7.4**
    - lazyUnit을 사용해 (A) -> B 타입의 임의의 함수를 비동기적으로 결과를 평가하는 함수로 변환하는 asyncF를 작성하라.
```kotlin
fun <ITEM, RESULT> asyncF(f: (ITEM) -> RESULT): (ITEM) -> Par<RESULT> =
    { item ->
        lazyUnit { f(item) }
    }
기존 콤비네이터로 더 구현해보자.
sortPar
  • Par<List<Int>>를 정렬해보자.
  1. Par.run()해서 평가하고, 결과 리스트를 받아 정렬한 다음 unit을 이용해 다시 감싸기
  2. map2를 이용해 내부 리스트에 접근하기
fun sortPar(parList: Par<List<Int>>): Par<List<Int>> =  
    map2(parList, unit(Unit)) { list, _ -> list.sorted() }
  • map2를 통해서만 내부의 값을 바꿀 수 있다.

    • 그러니 우선 map2를 써야겠다.
    • 하나의 인자만 필요하니 다른 건 빈값을 넣자.
  • 이제는 Par<A>타입을 받아 Par<B>를 반환하는 map연산을 만들 수 있다.

  • fun <ITEM, RESULT> map(pa: Par<ITEM>, block: (ITEM) -> RESULT): Par<RESULT> = map2(pa, unit(Unit)) { a, _ -> block(a) }

  • 이제 다시, map으로 sortPar를 구현해보자.

  • fun sortPar2(parList: Par<List<Int>>): Par<List<Int>> = map(parList) { it.sorted() }

  • map2map을 구현하였다.

    • 이렇다는 것은, map2가 더 강력한 기본 요소인 것이다.

      parMap
  • 리스트에 대해 병렬로 map을 수행해보자.

fun <ITEM, RESULT> parMap(  
    list: List<ITEM>,  
    block: (ITEM) -> RESULT  
): Par<List<RESULT>> {  
    val fbs: List<Par<RESULT>> = list.map(asyncF(block))  
    TODO()  
}
  • 우선 List<ITEM>List<Par<RESULT>타입으로 바꿔주었다.

  • 그 다음은, List<Par<RESULT>>Par<List<RESULT>>로 바꿔주면 된다.

    • 이 연산은 무엇일까?
      • sequence이다.
  • 연습문제 7.5

    • run을 호출하지 않고 sequence를 작성하여라

      val <T> List<T>.head: T  
      get() = first()  

val List.tail: List
get() = this.drop(1)

val Nil = listOf()

fun sequence1(ps: List): Par =
when (ps) {
Nil -> unit(Nil)
else -> map2(
ps.head,
sequence1(ps.tail)
) { a: ITEM, b: List ->
listOf(a) + b
}
}

fun sequence(ps: List): Par =
when {
ps.isEmpty() -> unit(Nil)
ps.size == 1 -> map(ps.head) { listOf(it) }
else -> {
val l = ps.subList(0, ps.size / 2)
val r = ps.subList(ps.size / 2, ps.size)
map2(sequence(l), sequence(r)) { la, lb ->
la + lb
}
}
}

- 이렇게 2가지 버전으로 기본연산 `sequence`를 만들었다.

- 이제 `parMap`을 구현할 수 있다.
```kotlin
fun <ITEM, RESULT> parMap(  
    list: List<ITEM>,  
    block: (ITEM) -> RESULT  
): Par<List<RESULT>> {  
    val fbs: List<Par<RESULT>> = list.map(asyncF(block))  
    return sequence1(fbs)
}
  • 연습문제 7.6

    • 리스트의 원소를 병렬로 걸러내는 parFilter 구현

      fun <ITEM> parFilter(
      list: List<ITEM>, 
      block: (ITEM) -> Boolean
      ): Par<List<ITEM>> {  
      val pars: List<Par<ITEM>> = list.map { lazyUnit { it } }  
      return map(sequence(pars)) { la ->  
        la.flatMap { a ->  
            if (block(a)) listOf(a) else emptyList()  
        }  
      }
      }
정의한 API의 법칙을 형식화해보자.
  • 매핑(mapping)
  • 논리 스레드 분기(forking)

두 가지에 대해 알아보자.

매핑 법칙
map(unit(1)) { it + 1 } == unit(2)

이 식은 unit(1){ i + 1 }로 매핑한 것이 unit(2)와 같음을 뜻한다.
Par객체가 동치라는 것은 모든 올바른 ExecutorService에 대해 두 Par 객체가 내놓는Future의 결과가 같다는 뜻이다.

이 함수를 일반화 해보자.
map(unit(x), f) == unit(f(x))

이 법칙은 우리에게 몇가지 제약을 준다.

  1. unit 구현은 자신이 받는 값을 들여다 볼 수 없다. 그저 전달할 뿐이다.
    • ExecutorServiceCallable을 전달할 때도 비슷했다. Callable객체에 대한 어떤 가정도 할 수 없고 Callable에 따라 어떤 분기도 할 수 없다. 그저 실행할 뿐이다.
  2. map과 unit의 구현에서 다운캐스팅을 하거나 타입캐스팅을 금지한다.
앞서 본 규칙을 더 단순화해보자.
  • 어떤 x와 f를 사용하든 법칙이 성립해야한다.
  • f가 항등함수라면 어떨까? fun <A> id(a: A):A = a
val x = 1
val y = unit(x)
val f = { a: Int -> a + 1 }
val id = { a: Int -> a }

// 초기 법칙
map(unit(x), f) == unit(f(x))

// f를 id로 바꿔보자.
map(unit(x), id) == unit(id(x))
map(unit(x), id) == unit(x) // id(x) == x
map(y, id) == y // unit(x) == y
  • unit에 대한 언급 없이 map에 대해서만 성립하는 법칙을 만들었다.
  • map이 할 수 있는 일은 f의 결과를 y에 적용시키는 것이다.
    • map은 함수를 적용시키기 전에 예외를 던지거나 비정상 종료를 할 수 없다.
    • f가 항등함수인 id라면 y는 영향받지 않고 반환돼야한다.
      • 이를 map이 구조보존적이어야 한다 라고 말한다.
논리 스레드 분기의 법칙
  • fork는 병렬 계산의 결과에 영향을 끼치면 안된다. 라는 법칙이 있다.
fork { x } = x
  • fork는 주 스레드와 분기된 별도의 논리적 스레드에서 비동기적으로 처리된다는 점을 제외하면 x와 동일하게 일을 해야한다.
  • 이 단순한 속성은 fork 구현을 강하게 제약한다.
    • 이렇게 법칙을 만들었으면 법칙을 깨보려고 시도해봐야한다.
두 Par 인스턴스가 동등한지 검사하는 단언문 함수를 작성해보자.
infix fun <A> Par<A>.shouldBe(other: Par<A>) = { es ->
    if (this(es).get() != other(es).get())
        throw AssertionError("Par instance not equal")
}
  • 이 단언문 메소드를 사용하면 fork 구현에서 발생하는 미묘한 문제를 찾아낼 수 있다.
    • 스레드 풀이 고정된 ExecutorService를 쓰면 교착 상태에 빠지기 쉽다.
val es = Executors.newFixedThreadPool(1)

val a: Par<Int> = lazyUnit { 42 + 1 }
val b: Par<Int> = fork { a }

(a shouldBe b)(es)
  • 위 코드에서는 교착 상태가 발생한다.

  • 왜그럴까?

    • fork의 구현을 보자.

      fun <ITEM> fork(  
      item: () -> Par<ITEM>  
      ): Par<ITEM> = { es: ExecutorService ->  
      es.submit(object : Callable<ITEM> {  
        override fun call(): ITEM = item()(es).get()  
      })  
      }
  • fork 안에서 Callable을 받는다.

  • fork { a }fork { lazyUnit { 42 + 1 } }이고, 이는 fork { fork { unit() } } 이다.

    • fork 안에서 fork를 호출하고 있는것이다.
    • 그러다 보니 받은 ExecutorServiceCallable을 전달해서 평가하는데 그 안에서 또 ExecutorServiceCallable을 제출하는 것이다.
    • 근데 스레드 풀은 1개니까 교착상태가 발생한다.
  • 이렇게 fork 구현에 대한 반례를 찾았다.

    • 해결 방안
      1. 법칙이 성립하도록 구현을 수정
      2. 법칙이 언제 성립하는지를 더 명확히 기술하도록 법칙을 세밀화
        • e.g. 스레드풀은 제한없이 커질 수 있다.
        • 이런 법칙들은 문서화해야함
  • fork를 수정해서 고정 크기의 스레드 풀에서도 정상동작하게 할 수 있을지 보자.

  • fun <A> fork(pa: () -> Par<A>): Par<A> = { es -> pa()(es) }

  • pa를 바로 평가했다.

    • 이는 교착 상태를 방지해준다.
    • 하지만 논리적 스레드를 분기하지 않았다.
      • fork의 의도와 다르다.
      • 하지만, 필요할 때까지 계산을 인스턴스화 하는 것을 지연시키므로, 유용하다.
  • delay라는 이름으로 바꾸겠다.

  • fun <A> delay(pa: () -> Par<A>): Par<A> = { es -> pa()(es) }

콤비네이터를 일반적인 형태로 세분화 해보자.
  • API를 구현하며 새로운 콤비네이터가 필요할 때가 있다.
  • 새 콤비네이터를 만들기 전에 필요한 콤비네이터를 가장 일반적인 형태로 세분화할 수 있는지 생각해봐야한다.
  • 필요한 콤비네이터가 더 일반적인 콤비네이터의 특별한 경우일 수 있기 떄문이다.

세분화하는 과정의 예시를 살펴보자.

fun <ITEM> choice(
    condition: Par<Boolean>, 
    t: Par<ITEM>, 
    f: Par<ITEM>
): Par<ITEM>
  • condition의 결과에 따라 t, f를 실행하고 싶다.
  • condition의 평가를 블록한 상태로 기다리고 그 결과를 통해 어느 한쪽을 실행하도록 처리하면 된다.
fun <ITEM> choice(
    condition: Par<Boolean>, 
    t: Par<ITEM>, 
    f: Par<ITEM>
): Par<ITEM> = { es: ExecutorService -> 
    when (run(es, condition).get()) {
        true -> run(es, t)
        false -> run(es, f)
    }
}
의심해보자.
  • Boolean을 사용하고 두 개 중에 한개만 택한다는 것은 좀 임의적인 것 같다.

  • 두개여야 할 이유는 무엇일까, N개를 받도록 변경해보자.

  • 연습문제 7.10

    • choiceN을 구현하라.

    • choiceN을 이용해 choice를 구현하라.

      fun <ITEM> choiceN(n: Par<Int>, choices: List<Par<ITEM>>): Par<ITEM>
      = { es: ExecutorService ->  
        run(es, choices[run(es, cond).get()])  
      }
의심해보자.
  • 왜 List일까. Map으로 해서 1:1로 찾을 수 있지 않을까?

  • 연습문제 7.11

    • choiceMap 콤비네이터를 구현하라.

      fun <K, V> choiceMap(
      key: Par<K>,
      choices: Map<K, Par<V>>
      ): Par<V> = { es: ExecutorService ->  
      run(es, choices.getValue(run(es, key).get()))  
      }
의심해보자.
  • Map도 너무 임의적이지 않나

  • 실제로 여기서 맵은 (K) -> Par<V>를 반환하는 것만 한다.

  • 모든 케이스를 아우르는 API를 만들자.

  • 연습문제 7.12

    • 새 기본요소 chooser를 구현하라.

    • chooser를 사용해서 choice, choiceN, choiceMap을 구현하라.

      fun <A, B> chooser(pa: Par<A>, choices: (A) -> Par<B>): Par<B>
      = { es: ExecutorService ->  
        run(es, choices(run(es, pa).get()))  
      }

fun choice_2(
cond: Par,
t: Par,
f: Par
): Par = chooser(cond) {
when (it) {
true -> t
false -> f
}
}

fun choiceN_2(
cond: Par,
choices: List
): Par = chooser(cond) {
choices[it]
}

fun <K, V> choiceMap_2(
cond: Par,
choices: Map<K, Par>
): Par = chooser(cond) {
choices.getValue(it)
}


- 시그니처를 자세히 보자.
    - `Par<A>`를 받아서 `Par<B>`를 반환한다. 
    - `flatMap`의 시그니처와 동일하다!

그렇다.
`choice`의 가장 기본연산은 `flatMap`이 었던 것이다.

###### 의심해보자.
- 과연 `flatMap`은 기본연산일까. 더 일반화 하지는 못할까?
- `flatMap`은 두 단계로 세분화 할 수 있다.
    - Mapping
    - Flatten
- Flatten 연산을 `join`이라고 하자. 
    - `join`은 모든 X타입에 대해 `Par<Par<X>>`를 `Par<X>`로 변환한다.
    - 개념적으로 보면, `run`을 호출하면 내부 계산을 실행한 후 내부 계산의 완료를 기다렸다가 결과를 돌려주는 병렬계산이라고 할 수 있다.

- 연습문제 7.13
    - join을 구현하라.
    - join을 써서 flatMap을 구현하라.
    - flatMap을 써서 join을 구현하라.


```kotlin
fun <ITEM> join(item: Par<Par<ITEM>>): Par<ITEM> 
    = { es: ExecutorService ->  
        run(es, run(es, item).get())  
    }  

fun <ITEM, RESULT> flatMap(
    item: Par<ITEM>, 
    block: (ITEM) -> Par<RESULT>
): Par<RESULT> = join(map(item, block))

+ Recent posts