병렬처리하는 법을 알아보자.

예를 들어보자.
[1, 2, 3, 4, 5 ,6, 7, 3, 32, 12, 23, 18, 10] 리스트의 합계를 구해보자.

fold를 사용해 계산한다고 하면, 이렇게 될 것이다.

1 + 2 + ...
3 + 4 + ...
7 + 5 + ...
12 + 6 + ...
18 + 7 + ...
25 + 3 + ...
28 + 32 + ...
60 + 12 + ...
72 + 23 + ...
95 + 18 + ...
113 + 10
123

총 시간은 O(n)만큼 걸린다.
분할 정복을 사용해보자.

[1, 2, 3, 4, 5 ,6, 7,]
[3, 32, 12, 23, 18, 10]
두 리스트의 합으로 볼 수 있다.
더 잘게 쪼개면

[1, 2, 3, 4]
[5 ,6, 7,]
[3, 32, 12]
[23, 18, 10]
이렇게 계속 쪼갤 수 있다.
쪼갠 각 리스트의 합을 모두 더하면 전체 합을 구할 수 있죠.

자 그럼 이제 병렬 처리할 단위를 구할 수 있습니다.
쪼갠 리스트를 각 스레드에 할당해서 계산하면 되겠죠~

사실 이번 장에서는 병렬처리를 어떻게 구현하느냐는 관심없스요.
그냥 병렬처리를 사용하는 함수형 API를 만드는 것이 목적인 것이요.

위에서의 예제를 코드로 봐볼게요.

fun sum(ints: List<Int>): Int =  
    if (ints.size <= 1) {  
        ints.firstOrNull() ?: 0  
    } else {  
        val l = ints.subList(0, ints.size / 2)  
        val r = ints.subList(ints.size / 2, ints.size)  
        sum(l) + sum(r)  
    }

sum(l) + sum(r) 에서 각 계산을 병렬로 처리해야하죠! 근데 지금은 sum(l)만 주구장창 계산하다. sum(r)을 계산하기 시작할거에요.

그래서 이건 병렬 계산이에요~ 라는 걸 알려줄 자료형을 정해봐요.

Parallel을 줄여서 Par 이라고 할게요.

class Par<ITEM>(val get: ITEM)  

fun <ITEM> unit(item: () -> ITEM): Par<ITEM> = Par(item())

fun <ITEM> get(item: Par<ITEM>): ITEM = item.get  

병렬계산을 하고 계산 결과를 item에 담는 자료형을 만들었습니다.
예제에 Par을 써보죠

fun sum2(ints: List<Int>): Int =  
    if (ints.size <= 1) {  
        ints.firstOrNull() ?: 0  
    } else {  
        val l = ints.subList(0, ints.size / 2)  
        val r = ints.subList(ints.size / 2, ints.size)  
        val sumL: Par<Int> = unit { sum2(l) }  
        val sumR: Par<Int> = unit { sum2(r) }  
        sumL.get + sumR.get  
    }

sumL.get + sumR.get 가 호출되기 전까지는 sum(l) , sum(r) 가 평가되지 않는다.
하지만 get을 호출하면 sum(l)이 모두 평가되고 난 후 sum(r)이 순차적으로 평가 될 것이다.

병렬처리를 하고싶어 만들었지만 실제로는 순차처리되는 것이다..

위 코드에서 sumLsumR을 치환해보면 참조투명성을 깨는 것을 볼 수 있다.

unit { sum(l) }.get + unit { sum(r) }.get

결과는 똑같겠지만, unit { sum(l) } 이 모두 평가되어야 unit { sum(r) }이 초기화된다.
위에서 본 것과는 차이가 있다.
다만 get을 사용하지 않는다면 이런 참조 투명성을 해치는 부수효과가 없다.

이 부수효과를 해결하기 위해서 get 호출을 최대한 미뤄야한다.

어떻게 해야할까.

일단 get 호출을 하지 않는다는 것을 전제로하자.

그럼 sum 함수는 Par<Int> 를 반환하는 게 맞다.

fun sum3(ints: List<Int>): Par<Int> =  
    if (ints.size <= 1) {  
        unit { ints.firstOrNull() ?: 0 }  
    } else {  
        val l = ints.subList(0, ints.size / 2)  
        val r = ints.subList(ints.size / 2, ints.size)  
        map2(sum3(l), sum3(r)) { lx: Int, rx: Int -> lx + rx }  
    }

map2 함수를 만들어서 구현했다.

fun <ITEM1, ITEM2, RESULT> map2(item1: Par<ITEM1>, item2: Par<ITEM2>, block: (ITEM1, ITEM2) -> RESULT): Par<RESULT> =  
    unit { block(item1.get, item2.get) }

이렇게 정의할 수 있다.
재귀적인 경우 더이상 unit을 호출하지 않는다.

자 그럼 이제 논의할 것은 map2의 인자를 지연 계산으로 받을 것이냐이다.

map2가 엄격한 인자를 사용할 때 문제점은 다음과 같다.

sum(listOf(1,2,3,4))

map2(
    sum(listOf(1, 2)),
    sum(listOf(3, 4))
)

map2(
    map2(
        listOf(1),
        listOf(2)
    ),
    sum(listOf(3, 4))
)

map2(
    map2(
        listOf(1),
        listOf(2)
    ),
    map2(
        listOf(3),
        listOf(4)
    ),
)

이렇게 왼쪽을 완전히 계산한 다음 오른쪽을 계산하게 된다.
두 파라미터를 동시에 병렬 계산하지 않는 것이다.

두 파라미터를 동시에 병렬계산 하도록 하려면 map2를 지연계산으로 만들어야한다.
이렇게 결정하고

다음으로 볼 것은 map2는 항상 지연계산일 필요가 있냐는 것이다.
map2(
    unit { 1 },
    unit { 2 }
)

이 경우에는 굳이 병렬로 할 필요가 없다. 오히려 성능이 안좋아진다.
스레드 분기를 명확하게 만들어주자.
다른 스레드에서 실행돼요~ 라는 의미를 가지는 fork 함수를 만들겠어요.

fun sum4(ints: List<Int>): Par<Int> =  
    if (ints.size <= 1) {  
        lazyUnit { ints.firstOrNull() ?: 0 }  
    } else {  
        val l = ints.subList(0, ints.size / 2)  
        val r = ints.subList(ints.size / 2, ints.size)  
        map2(fork { sum4(l) }, fork { sum4(r) }) { lx: Int, rx: Int -> 
            lx + rx }
    }

그렇다면 인자가 적으면 별도의 스레드를 사용하지 않겠다. 는 어떻게 할 수 있을까.

fun sum4(ints: List<Int>): Par<Int> =  
    if (ints.size <= 1) {  
        lazyUnit { ints.firstOrNull() ?: 0 }  
    } else {  
        val l = ints.subList(0, ints.size / 2)  
        val r = ints.subList(ints.size / 2, ints.size)  
        if (l.size <= 2) {
            map2(sum4(l), fork { sum4(r) }) { lx: Int, rx: Int -> 
                lx + rx }
        } else {
            map2(fork { sum4(l) }, fork { sum4(r) }) { lx: Int, rx: Int -> 
                lx + rx }
        }
    }

이렇게하는 걸까.

자. 어쨌든 fork를 써서 "별도의 스레드에서 실행하겠어"라는 것을 만들어줬다.

그럼 unit은 지연 계산할 필요가 있을까?
fun <ITEM> unit(item: () -> ITEM): Par<ITEM> = Par(item())

unit 은 이렇게 만들었다.
앞에서 fork를 만들었으니, unit을 지연 계산하려면, fork를 사용하는 방법을 생각해 볼 수 있다.
그러면 엄격한 unit과 엄격하지 않은 unit을 만들 수 있다.

fun <ITEM> unit(item: ITEM): Par<ITEM> = Par(item)
fun <ITEM> lazyUnit(item: () -> ITEM): Par<ITEM> = 
    fork { unit(item()) }

unit같은 기본형 콤비네이터로, lazyUnit같은 파생형 콤비네이터를 만들어냈다.

스레드 분기는 언제해야할까?
  • fork를 호출했을 때 해야할까?
  • get으로 평가를 했을 때 스레드를 분기해야 할까?
  • fork에 스레드를 분기 책임이 있는 경우
    • fork의 구현은 스레드를 생성하고, 작업을 스레드 풀에 제출하는 방법을 알아야함
    • => 스레드풀 같은 곳에 전역적으로 접근이 가능해야함
    • => fork를 호출하는 시점에 스레드풀 같은 자원이 초기화 되어있어야함
    • 이는 프로그램에 여러 부분에서 원하는 대로 병렬성 전략을 제어할 수 있는 능력을 잃는다는 것
  • *get이 스레드 생성과 실행할 작업을 스레드 풀에 제출하는 책임을 가지는 게 더 적합하다.
    • ??

이제 fork를 통해서 병렬계산을하라고 '표시' 할 수 있고, Par는 해당 계산이 병렬 계산이라는 것을 알려주는 자료형이다.
사실상 fork와 Par는 병렬처리의 내부구현에 대해 알 필요가 없다.
Par는 그냥 값을 담는 컨테이너로만 생각하면 된다.

fun <ITEM> run(item: Par<ITEM>): ITEM = TODO()

run이란 함수를 만들어서 run이 스레드를 시작하거나, 스레드 풀에 제출하거나 하는 등의 방법을 통해 병렬성을 구현하게 한다.

병렬 처리는 어떻게 해야할까?

java.util.concurrent.ExecutorService 클래스를 사용하자.

interface Callable<ITEM> {  
    fun call(): ITEM  
}  

interface Future<ITEM> {  
    fun get(): ITEM  
    fun get(timeout: Long, timeUnit: TimeUnit): ITEM  
    fun cancel(evenIfRunning: Boolean): Boolean  
    fun isDone(): Boolean  
    fun isCancelled(): Boolean  
}  

interface ExecutorService {  
    fun <ITEM> submit(c: Callable<ITEM>): Future<ITEM>  
}

대충 코틀린으로 찌끄려보면 이렇다.

ExecutorService에는 Callable을 제출할 수 있다.
Future는 새로운 드레드에서 실행될 수도 있는 계산을 가리키는 핸들이다.
Future의 블록킹 메소드인 get을 호출해서 결과를 얻어올 수 있다.

ExecutorService를 사용해서 run을 구현해보자.
typealias Par<ITEM> = (ExecutorService) -> Future<ITEM>  
fun <ITEM> run(es: ExecutorService, item: Par<ITEM>): Future<ITEM> 
    = item(es)

Par 타입을 새로 정의하고 ExecutorService를 전달하도록 했다.
ExcutorSevice를 전달해야만 Future가 생긴다.

이제 우리가 만든 API를 탐구하고 다듬어보자.
object Pars {  
    fun <ITEM> unit(item: ITEM): Par<ITEM> = { es -> UnitFuture(item) }  

    data class UnitFuture<ITEM>(val item: ITEM) : Future<ITEM> {  
        override fun get(): ITEM = item  

        override fun get(timeout: Long, timeUnit: TimeUnit): ITEM = item  

        override fun cancel(evenIfRunning: Boolean): Boolean = false  

        override fun isDone(): Boolean = true  

        override fun isCancelled(): Boolean = false  
    }  

    fun <ITEM1, ITEM2, RESULT> map2(  
        item1: Par<ITEM1>,  
        item2: Par<ITEM2>,  
        block: (ITEM1, ITEM2) -> RESULT  
    ): Par<RESULT> = { es ->  
        val future1 = item1(es)  
        val future2 = item2(es)  
        UnitFuture(block(future1.get(), future2.get()))  
    }  

    fun <ITEM> fork(  
        item: () -> Par<ITEM>  
    ): Par<ITEM> = { es: ExecutorService ->  
        es.submit(object : Callable<ITEM> {  
            override fun call(): ITEM = item()(es).get()  
        })  
    }  
}

이렇게 Future의 구현체로 UnitFuture를 만들고 map2fork를 새로운 자료형을 통해 정의했다.

여기서 map2block()의 호출을 별도의 스레드에서 하지 않는다.
병렬성을 제어하려면 fork를 사용해야한다.
block()의 평가를 별도의 스레드에서 수행하고 싶다면 fork로 감싸주자.

Pars.fork {  
    Pars.map2(Pars.unit(1), Pars.unit(2)) { item1, item2 ->  
        item1 + item2  
    }  
}
  • 문제점

    1. map2구현은 타임아웃을 준수하지 않는다.
    2. fork 구현에서 Callable은 내부 작업이 완료될 때 까지 블록된다.
    3. Future에는 순수함수형 인터체이스가 없다.
      • 고로 사용자가 Future를 직접 다루지 못하게 한다.
      • 하지만, Future에 부수효과가 있더라도, 우리가 만든 Par API는 순수함수형으로 남는다. ??
  • 연습문제 7.3

    • Future의 타임아웃을 준수하도록 map2 구현을 수정하라

      fun <ITEM1, ITEM2, RESULT> map2_timeout(  
      item1: Par<ITEM1>,  
      item2: Par<ITEM2>,  
      block: (ITEM1, ITEM2) -> RESULT  
      ): Par<RESULT> = { es ->  
      val future1 = item1(es)  
      val future2 = item2(es)  
      TimedMap2Future(future1, future2, block)   
      }

data class TimedMap2Future<ITEM1, ITEM2, RESULT>(
val item1: Future,
val item2: Future,
val block: (ITEM1, ITEM2) -> RESULT
) : Future {
override fun get(): RESULT {
TODO("Not yet implemented")
}

override fun get(timeout: Long, timeUnit: TimeUnit): RESULT {  
    val timeoutMillis = TimeUnit.MICROSECONDS.convert(timeout, timeUnit)  
    val start = System.currentTimeMillis()  
    val a = item1.get(timeout, timeUnit)  
    val duration = System.currentTimeMillis() - start  
    val remainder = timeoutMillis - duration  
    val b = item2.get(remainder, TimeUnit.MICROSECONDS)  
    return block(a, b)  
}  

override fun cancel(evenIfRunning: Boolean): Boolean {  
    TODO("Not yet implemented")  
}  
override fun isDone(): Boolean {  
    TODO("Not yet implemented")  
}  
override fun isCancelled(): Boolean {  
    TODO("Not yet implemented")  
}  

}


- **연습문제 7.4**
    - lazyUnit을 사용해 (A) -> B 타입의 임의의 함수를 비동기적으로 결과를 평가하는 함수로 변환하는 asyncF를 작성하라.
```kotlin
fun <ITEM, RESULT> asyncF(f: (ITEM) -> RESULT): (ITEM) -> Par<RESULT> =
    { item ->
        lazyUnit { f(item) }
    }
기존 콤비네이터로 더 구현해보자.
sortPar
  • Par<List<Int>>를 정렬해보자.
  1. Par.run()해서 평가하고, 결과 리스트를 받아 정렬한 다음 unit을 이용해 다시 감싸기
  2. map2를 이용해 내부 리스트에 접근하기
fun sortPar(parList: Par<List<Int>>): Par<List<Int>> =  
    map2(parList, unit(Unit)) { list, _ -> list.sorted() }
  • map2를 통해서만 내부의 값을 바꿀 수 있다.

    • 그러니 우선 map2를 써야겠다.
    • 하나의 인자만 필요하니 다른 건 빈값을 넣자.
  • 이제는 Par<A>타입을 받아 Par<B>를 반환하는 map연산을 만들 수 있다.

  • fun <ITEM, RESULT> map(pa: Par<ITEM>, block: (ITEM) -> RESULT): Par<RESULT> = map2(pa, unit(Unit)) { a, _ -> block(a) }

  • 이제 다시, map으로 sortPar를 구현해보자.

  • fun sortPar2(parList: Par<List<Int>>): Par<List<Int>> = map(parList) { it.sorted() }

  • map2map을 구현하였다.

    • 이렇다는 것은, map2가 더 강력한 기본 요소인 것이다.

      parMap
  • 리스트에 대해 병렬로 map을 수행해보자.

fun <ITEM, RESULT> parMap(  
    list: List<ITEM>,  
    block: (ITEM) -> RESULT  
): Par<List<RESULT>> {  
    val fbs: List<Par<RESULT>> = list.map(asyncF(block))  
    TODO()  
}
  • 우선 List<ITEM>List<Par<RESULT>타입으로 바꿔주었다.

  • 그 다음은, List<Par<RESULT>>Par<List<RESULT>>로 바꿔주면 된다.

    • 이 연산은 무엇일까?
      • sequence이다.
  • 연습문제 7.5

    • run을 호출하지 않고 sequence를 작성하여라

      val <T> List<T>.head: T  
      get() = first()  

val List.tail: List
get() = this.drop(1)

val Nil = listOf()

fun sequence1(ps: List): Par =
when (ps) {
Nil -> unit(Nil)
else -> map2(
ps.head,
sequence1(ps.tail)
) { a: ITEM, b: List ->
listOf(a) + b
}
}

fun sequence(ps: List): Par =
when {
ps.isEmpty() -> unit(Nil)
ps.size == 1 -> map(ps.head) { listOf(it) }
else -> {
val l = ps.subList(0, ps.size / 2)
val r = ps.subList(ps.size / 2, ps.size)
map2(sequence(l), sequence(r)) { la, lb ->
la + lb
}
}
}

- 이렇게 2가지 버전으로 기본연산 `sequence`를 만들었다.

- 이제 `parMap`을 구현할 수 있다.
```kotlin
fun <ITEM, RESULT> parMap(  
    list: List<ITEM>,  
    block: (ITEM) -> RESULT  
): Par<List<RESULT>> {  
    val fbs: List<Par<RESULT>> = list.map(asyncF(block))  
    return sequence1(fbs)
}
  • 연습문제 7.6

    • 리스트의 원소를 병렬로 걸러내는 parFilter 구현

      fun <ITEM> parFilter(
      list: List<ITEM>, 
      block: (ITEM) -> Boolean
      ): Par<List<ITEM>> {  
      val pars: List<Par<ITEM>> = list.map { lazyUnit { it } }  
      return map(sequence(pars)) { la ->  
        la.flatMap { a ->  
            if (block(a)) listOf(a) else emptyList()  
        }  
      }
      }
정의한 API의 법칙을 형식화해보자.
  • 매핑(mapping)
  • 논리 스레드 분기(forking)

두 가지에 대해 알아보자.

매핑 법칙
map(unit(1)) { it + 1 } == unit(2)

이 식은 unit(1){ i + 1 }로 매핑한 것이 unit(2)와 같음을 뜻한다.
Par객체가 동치라는 것은 모든 올바른 ExecutorService에 대해 두 Par 객체가 내놓는Future의 결과가 같다는 뜻이다.

이 함수를 일반화 해보자.
map(unit(x), f) == unit(f(x))

이 법칙은 우리에게 몇가지 제약을 준다.

  1. unit 구현은 자신이 받는 값을 들여다 볼 수 없다. 그저 전달할 뿐이다.
    • ExecutorServiceCallable을 전달할 때도 비슷했다. Callable객체에 대한 어떤 가정도 할 수 없고 Callable에 따라 어떤 분기도 할 수 없다. 그저 실행할 뿐이다.
  2. map과 unit의 구현에서 다운캐스팅을 하거나 타입캐스팅을 금지한다.
앞서 본 규칙을 더 단순화해보자.
  • 어떤 x와 f를 사용하든 법칙이 성립해야한다.
  • f가 항등함수라면 어떨까? fun <A> id(a: A):A = a
val x = 1
val y = unit(x)
val f = { a: Int -> a + 1 }
val id = { a: Int -> a }

// 초기 법칙
map(unit(x), f) == unit(f(x))

// f를 id로 바꿔보자.
map(unit(x), id) == unit(id(x))
map(unit(x), id) == unit(x) // id(x) == x
map(y, id) == y // unit(x) == y
  • unit에 대한 언급 없이 map에 대해서만 성립하는 법칙을 만들었다.
  • map이 할 수 있는 일은 f의 결과를 y에 적용시키는 것이다.
    • map은 함수를 적용시키기 전에 예외를 던지거나 비정상 종료를 할 수 없다.
    • f가 항등함수인 id라면 y는 영향받지 않고 반환돼야한다.
      • 이를 map이 구조보존적이어야 한다 라고 말한다.
논리 스레드 분기의 법칙
  • fork는 병렬 계산의 결과에 영향을 끼치면 안된다. 라는 법칙이 있다.
fork { x } = x
  • fork는 주 스레드와 분기된 별도의 논리적 스레드에서 비동기적으로 처리된다는 점을 제외하면 x와 동일하게 일을 해야한다.
  • 이 단순한 속성은 fork 구현을 강하게 제약한다.
    • 이렇게 법칙을 만들었으면 법칙을 깨보려고 시도해봐야한다.
두 Par 인스턴스가 동등한지 검사하는 단언문 함수를 작성해보자.
infix fun <A> Par<A>.shouldBe(other: Par<A>) = { es ->
    if (this(es).get() != other(es).get())
        throw AssertionError("Par instance not equal")
}
  • 이 단언문 메소드를 사용하면 fork 구현에서 발생하는 미묘한 문제를 찾아낼 수 있다.
    • 스레드 풀이 고정된 ExecutorService를 쓰면 교착 상태에 빠지기 쉽다.
val es = Executors.newFixedThreadPool(1)

val a: Par<Int> = lazyUnit { 42 + 1 }
val b: Par<Int> = fork { a }

(a shouldBe b)(es)
  • 위 코드에서는 교착 상태가 발생한다.

  • 왜그럴까?

    • fork의 구현을 보자.

      fun <ITEM> fork(  
      item: () -> Par<ITEM>  
      ): Par<ITEM> = { es: ExecutorService ->  
      es.submit(object : Callable<ITEM> {  
        override fun call(): ITEM = item()(es).get()  
      })  
      }
  • fork 안에서 Callable을 받는다.

  • fork { a }fork { lazyUnit { 42 + 1 } }이고, 이는 fork { fork { unit() } } 이다.

    • fork 안에서 fork를 호출하고 있는것이다.
    • 그러다 보니 받은 ExecutorServiceCallable을 전달해서 평가하는데 그 안에서 또 ExecutorServiceCallable을 제출하는 것이다.
    • 근데 스레드 풀은 1개니까 교착상태가 발생한다.
  • 이렇게 fork 구현에 대한 반례를 찾았다.

    • 해결 방안
      1. 법칙이 성립하도록 구현을 수정
      2. 법칙이 언제 성립하는지를 더 명확히 기술하도록 법칙을 세밀화
        • e.g. 스레드풀은 제한없이 커질 수 있다.
        • 이런 법칙들은 문서화해야함
  • fork를 수정해서 고정 크기의 스레드 풀에서도 정상동작하게 할 수 있을지 보자.

  • fun <A> fork(pa: () -> Par<A>): Par<A> = { es -> pa()(es) }

  • pa를 바로 평가했다.

    • 이는 교착 상태를 방지해준다.
    • 하지만 논리적 스레드를 분기하지 않았다.
      • fork의 의도와 다르다.
      • 하지만, 필요할 때까지 계산을 인스턴스화 하는 것을 지연시키므로, 유용하다.
  • delay라는 이름으로 바꾸겠다.

  • fun <A> delay(pa: () -> Par<A>): Par<A> = { es -> pa()(es) }

콤비네이터를 일반적인 형태로 세분화 해보자.
  • API를 구현하며 새로운 콤비네이터가 필요할 때가 있다.
  • 새 콤비네이터를 만들기 전에 필요한 콤비네이터를 가장 일반적인 형태로 세분화할 수 있는지 생각해봐야한다.
  • 필요한 콤비네이터가 더 일반적인 콤비네이터의 특별한 경우일 수 있기 떄문이다.

세분화하는 과정의 예시를 살펴보자.

fun <ITEM> choice(
    condition: Par<Boolean>, 
    t: Par<ITEM>, 
    f: Par<ITEM>
): Par<ITEM>
  • condition의 결과에 따라 t, f를 실행하고 싶다.
  • condition의 평가를 블록한 상태로 기다리고 그 결과를 통해 어느 한쪽을 실행하도록 처리하면 된다.
fun <ITEM> choice(
    condition: Par<Boolean>, 
    t: Par<ITEM>, 
    f: Par<ITEM>
): Par<ITEM> = { es: ExecutorService -> 
    when (run(es, condition).get()) {
        true -> run(es, t)
        false -> run(es, f)
    }
}
의심해보자.
  • Boolean을 사용하고 두 개 중에 한개만 택한다는 것은 좀 임의적인 것 같다.

  • 두개여야 할 이유는 무엇일까, N개를 받도록 변경해보자.

  • 연습문제 7.10

    • choiceN을 구현하라.

    • choiceN을 이용해 choice를 구현하라.

      fun <ITEM> choiceN(n: Par<Int>, choices: List<Par<ITEM>>): Par<ITEM>
      = { es: ExecutorService ->  
        run(es, choices[run(es, cond).get()])  
      }
의심해보자.
  • 왜 List일까. Map으로 해서 1:1로 찾을 수 있지 않을까?

  • 연습문제 7.11

    • choiceMap 콤비네이터를 구현하라.

      fun <K, V> choiceMap(
      key: Par<K>,
      choices: Map<K, Par<V>>
      ): Par<V> = { es: ExecutorService ->  
      run(es, choices.getValue(run(es, key).get()))  
      }
의심해보자.
  • Map도 너무 임의적이지 않나

  • 실제로 여기서 맵은 (K) -> Par<V>를 반환하는 것만 한다.

  • 모든 케이스를 아우르는 API를 만들자.

  • 연습문제 7.12

    • 새 기본요소 chooser를 구현하라.

    • chooser를 사용해서 choice, choiceN, choiceMap을 구현하라.

      fun <A, B> chooser(pa: Par<A>, choices: (A) -> Par<B>): Par<B>
      = { es: ExecutorService ->  
        run(es, choices(run(es, pa).get()))  
      }

fun choice_2(
cond: Par,
t: Par,
f: Par
): Par = chooser(cond) {
when (it) {
true -> t
false -> f
}
}

fun choiceN_2(
cond: Par,
choices: List
): Par = chooser(cond) {
choices[it]
}

fun <K, V> choiceMap_2(
cond: Par,
choices: Map<K, Par>
): Par = chooser(cond) {
choices.getValue(it)
}


- 시그니처를 자세히 보자.
    - `Par<A>`를 받아서 `Par<B>`를 반환한다. 
    - `flatMap`의 시그니처와 동일하다!

그렇다.
`choice`의 가장 기본연산은 `flatMap`이 었던 것이다.

###### 의심해보자.
- 과연 `flatMap`은 기본연산일까. 더 일반화 하지는 못할까?
- `flatMap`은 두 단계로 세분화 할 수 있다.
    - Mapping
    - Flatten
- Flatten 연산을 `join`이라고 하자. 
    - `join`은 모든 X타입에 대해 `Par<Par<X>>`를 `Par<X>`로 변환한다.
    - 개념적으로 보면, `run`을 호출하면 내부 계산을 실행한 후 내부 계산의 완료를 기다렸다가 결과를 돌려주는 병렬계산이라고 할 수 있다.

- 연습문제 7.13
    - join을 구현하라.
    - join을 써서 flatMap을 구현하라.
    - flatMap을 써서 join을 구현하라.


```kotlin
fun <ITEM> join(item: Par<Par<ITEM>>): Par<ITEM> 
    = { es: ExecutorService ->  
        run(es, run(es, item).get())  
    }  

fun <ITEM, RESULT> flatMap(
    item: Par<ITEM>, 
    block: (ITEM) -> Par<RESULT>
): Par<RESULT> = join(map(item, block))

Random 함수를 생각해보자.

import kotlin.random.Random

fun main() {
    val rng = Random
    rng.nextInt()
}

rng.nextInt()를 할 때마다 다른 값이 나온다. 
왜일까? rng는 그대로인데..

Random 안에서 어떤 "상태"를 변경하기 때문이다. 

다들 알다시피 함수형 프로그램에서 이런 가변 상태는 부수효과이고, 참조투명하지 않다. 

또한 테스트 하기도 어려워진다

fun rollDice(): Int {
    val rng = Random
    return rng.nextInt(6)
}

이런 주사위를 던지는 함수를 생각해보자. nextInt(6)을 했지만, 실제로는 0~5까지의 범위가 나온다. 
잘못 작성한 함수라는 것이다. 

1~5 범위의 수가 나오면 사용자는 오류인지 모를것이다. 따라서 1/6의 확률로 오류가 발생하는 것이다. 
일반적으로는 이런 함수보다 오류 상황을 더 예측하기 힘들다. 그리고 오류가 발생했을 때 안정적으로 재생산을 할 수 있어야 한다. 

이번 장에서는 함수형 프로그래밍에서 이런 상태 전이를 처리하는 법을 알아본다.

명령형 해법이 아닌 순수 함수형 방식으로 말이다!

순수 함수형 난수 생성기

부수효과를 제거하고, 참조투명성을 회복시켜보자.

이제부터는 "상태 갱신"을 명시화 할 것이다. 위에서 본 kotlin의 Random처럼 내부의 상태가 아무도 모르게 변하는 것이 아니라 사용자가 알 수 있게 하겠다는 뜻이다.

interface RNG {
    fun nextInt(): Pair<Int, RNG>
}

이 인터페이스를 보자. 반환을 Pair로한다. Int는 랜덤 생성된 난수이고, RNG는 다음 상태를 말한다.
즉, 난수를 생성하면 난수 값과 새 상태를 함께 돌려주는 것이다. 

이 인터페이스를 구현해보자. linear congruential generator라는 알고리듬을 사용할 것이다.

data class SimpleRNG(val seed: Long) : RNG {
    override fun nextInt(): Pair<Int, RNG> {
        val newSeed = (seed * 0x5DEECE66DL + 0xBL) and 0xFFFFFFFFFFFFL
        val nextRNG = SimpleRNG(newSeed)
        
        val n = (newSeed ushr 16).toInt()
        return n to nextRNG
    }
}
  • 이 명령은 원하는 만큼 반복해서 실행할 수 있다.
  • 같은 시드 값에 대해서는 항상 같은 값을 얻는다.
  • 결과로 나온 RNG에 대해서도 같은 난수를 얻는다.

상태가 있는 API를 순수 함수형 API로 만들기

위에서는 난수 생성기를 순수 함수형 API로 만들어 봤다.

이렇게 API가 내부의 무언가를 변이 시키는 대신 다음 상태를 계산하게 만드는 문제는 다른 문제에도 사용할 수 있다. 

class MutatingSequencer {
    private var repo: Repository = TODO()
    fun nextInt(): Int = TODO()
    fun nextDouble(): Double = TODO()
}

어떤 수 시퀀스를 만드는 Repository가 있다고 하자.
nextInt()와 nextDouble()은 repo를 통해 값을 받아와 출력할 것이다. 그리고 각각의 방식으로 repo의 상태를 변경할 것이다.(인덱스에 +1을 한다거나 등)

이런 부수효과가 있는 함수를 순수 함수형으로 만드려면? 
위에서도 말했죠! 상태 갱신을 명시화

interface StateActionSequencer {
    fun nextInt(): Pair<Int, StateActionSequencer>
    fun nextDouble(): Pair<Double, StateActionSequencer>
}

현재 값과, 다음 상태를 반환합니다. 이러믄 다음 상태를 전달할 책임을 호출자 쪽에서 져야합니다.

연습문제 6.1

fun nonNegativeInt(rng: RNG): Pair<Int, RNG> {
    val cur = rng.nextInt()
    return if (cur.first < 0) {
        if (cur.first == Int.MIN_VALUE) {
            Int.MAX_VALUE to cur.second
        } else {
            abs(cur.first) to cur.second
        }
    } else {
        cur
    }
}

fun nonNegativeInt2(rng: RNG): Pair<Int, RNG> {
    val (value, next) = rng.nextInt()
    return (if (value < 0) -(value + 1) else value) to next
}

연습문제 6.2

fun double(rng: RNG): Pair<Double, RNG> {
    val (value, next) = nonNegativeInt(rng)
    return (value / (Int.MAX_VALUE.toDouble() + 1)) to next
}

연습문제 6.3

fun intDouble(rng: RNG): Pair<Pair<Int, Double>, RNG> {
    val (i, rng1) = rng.nextInt()
    val (d, rng2) = double(rng1)
    return (i to d) to rng2
}

fun doubleInt(rng: RNG): Pair<Pair<Double, Int>, RNG> {
    val (d, rng1) = double(rng)
    val (i, rng2) = rng1.nextInt()
    return (d to i) to rng2
}

fun double3(rng: RNG): Pair<Triple<Double, Double, Double>, RNG> {
    val (d, rng1) = double(rng)
    val (d1, rng2) = double(rng1)
    val (d2, rng3) = double(rng2)
    return Triple(d, d1, d2) to rng3
}

연습문제 6.4

fun ints(count: Int, rng: RNG): Pair<List<Int>, RNG> {
    if (count == 0) return Nil to rng
    val (i, rng1) = rng.nextInt()
    val (list, rng2) = ints(count - 1, rng1)
    return Cons(i, list) to rng2
}

상태 동작을 전달하는 암시적 접근 방법

앞에서의 순수 함수형 해법은 뭐였지. "상태 갱신 명시화" 였다.

이 해법은 부수효과를 피할 수 있지만, 순차적이며, 오류를 저지르기 쉽고, 번거롭다.

지금까지 공통되는 패턴은  (RNG) -> Pair<A, RNG> 이었다. 모든 함수가 이 패턴을 따라 만들어졌다.

처음의 RNG 상태에서 다른 RNG 상태를 반환한다. 이를 상태 전이/동작이라고 한다.

상태 전이/동작은 콤비네이터를 통해 조합할 수 있다. 직접 상태를 반복적으로 처리하지 않고 콤비네이터가 자동으로 다음 동작을 하게 만들어보자.

typealias Rand<A> = (RNG) -> Pair<A, RNG>

// 함수를 반환
val intR: Rand<Int> = { rng -> rng.nextInt() }

Rand<A>라는 타입을 만들었다. 이는 함수 타입이다. 

intR은 함수를 반환하고, intR(SimpleRNG(10)) 이런 식으로 사용할 수 있다.

fun <A> unit(a: A): Rand<A> = { rng -> a to rng }

fun <A, B> map(s: Rand<A>, f: (A) -> B): Rand<B> = { rng ->
    val (a, rng2) = s(rng)
    f(a) to rng2
}

이렇게 상태를 사용하지 않고 파라미터로 받은 상수를 반환하는 unit함수와 상태를 변경하지 않으면서 출력을 변환하는 map도 있다. 

연습문제 6.5

fun doubleR(): Rand<Double> =
    map(::nonNegativeInt) { value ->
        value / (Int.MAX_VALUE.toDouble() + 1)
    }

상태 동작 조합을 통해 더 큰 능력 발휘하기

상태 전이를 감추고 단일 상태 동작을 다루는 API를 개발하였다. 

하지만 map은 intDouble, doubleInt 같은 함수를 구현할 수 없다. 단항 함수가 아니라 이항 함수를 사용해 두 가지 RNG 동작을 조합할 수 있는 map2를 만들자.

연습문제 6.6

fun <ITEM1, ITEM2, RESULT> map2(
    ra: Rand<ITEM1>,
    rb: Rand<ITEM2>,
    f: (ITEM1, ITEM2) -> RESULT
): Rand<RESULT> = { rng ->
    val (item1, rng1) = ra(rng)
    val (item2, rng2) = rb(rng1)
    
    f(item1, item2) to rng2
}

map2를 활용해 both 함수를 만들 수 있다.

fun <ITEM1, ITEM2> both(ra: Rand<ITEM1>, rb: Rand<ITEM2>): Rand<Pair<ITEM1, ITEM2>> =
    map2(ra, rb) { a, b -> a to b }

val intDoubleR: Rand<Pair<Int, Double>> = both(intR, doubleR)
val doubleIntR: Rand<Pair<Double, Int>> = both(doubleR, intR)

both를 사용해 intDouble, doubleInt를 만들 수 있다.

연습문제 6.7

fun <ITEM> sequence(fs: List<Rand<ITEM>>): Rand<List<ITEM>> = { rng ->
    when (fs) {
        is Nil -> Nil to rng
        is Cons -> {
            val (item, rng1) = fs.head(rng)
            val (list, rng2) = sequence(fs.tail)(rng1)
            Cons(item, list) to rng2
        }
    }
}

fun <ITEM> sequenceFold(fs: List<Rand<ITEM>>): Rand<List<ITEM>> = { rng ->
    foldRight(fs, Nil to rng) { head: Rand<ITEM>, tail: Pair<List<ITEM>, RNG> ->
        val (item, rng1) = head(rng)
        val (list, _) = tail
        Cons(item, list) to rng1
    }
}

fun <ITEM> sequenceFold2(fs: List<Rand<ITEM>>): Rand<List<ITEM>> =
    foldRight(fs, unit(Nil)) { item: Rand<ITEM>, acc: Rand<List<ITEM>> ->
        map2(item, acc) { h, t ->
            Cons(h, t)
        }
    }

상태 동작을 내포시켜서 재귀적으로 재시도하기

우리는 위에서 부터 랜덤 수를 생성하는 예제를 개선해왔다. 

1. kotlin.math.Random 객체의 내부 상태를 변이

2. 상태 동작을 명시적으로 전달하는 방식 ((RNG) -> Pair<ITEM, RNG>)

3. 상태 전이를 뒤에 감추는 방식 (Rand<ITEM>)

이런 단계를 거쳤다.

그리고 map, map2 콤비네이터를 이용해서 문제를 쉽게 해결할 수 있었다.
하지만 map, map2로도 쉽게 작성할 수 없는 함수도 있다.

0이상 n 미만의 정수를 랜덤 생성해주는 nonNegativeLessThan 함수를 작성해보자.

fun nonNegativeLessThan(n: Int): Rand<Int> =
	map(::nonNegativeInt) { it % n }

이렇게 작성할 수 있겠다. 이 함수는 우리가 원하는 범위의 값을 생성하지만, 나오는 수의 분포가 치우쳐져 있다.
Int.MAX_VALUE가 n의 배수여야 고른 분포가 나온다

무슨말이냐면? 예를 들어 Int.MAX_VALUE가 8이라고 해보자. 가정이다.
nonNegativeLessThan을 하면 -> 12340123 이런 분포이다. 1,2,3 은 나올 확율이 1/4이지만, 4, 0은 1/8인 것을 볼 수 있다.
8 % 5 = 3, 즉 Int.MAX_VALUE % n보다 작은 수는 좀 더 자주 발생한다.

그러니, 랜덤으로 뽑은 수가 n의 배수 중 최대값을 넘는다면 (6, 7, 8 이 나온다면) 더 작은 수를 얻기 위해 재시도 해야한다. 

fun nonNegativeLessThan(n: Int): Rand<Int> =
    map(::nonNegativeInt) { i ->
        val mod = i%n
        if (i + (n - 1) - mod >= 0) mod
        else nonNegativeLessThan(n) { i2 -> ?? }
    }

재귀를 써야하지만, Rand<Int>를 반환해야하는데, map을 사용해서는 할 수가 없다.

fun nonNegativeLessThan(n: Int): Rand<Int> = { rng ->
    val (i, rng2) = nonNegativeInt(rng)
    val mod = i % n
    if (i + (n - 1) - mod >= 0) {
        mod to rng
    } else {
        nonNegativeLessThan(n)(rng2)
    }
}

map을 사용해서 상태 이전을 감추는 대신 명시적으로 RNG를 반환받아 전달할 수 있다. 
하지만 우리는 여전히 상태 이전을 감추고 싶다. 새로운 콤비네이터 flatMap을 만들어보자.

연습문제 6.8

fun <ITEM, RESULT> flatMap(f: Rand<ITEM>, g: (ITEM) -> Rand<RESULT>): Rand<RESULT> = { rng ->
    val (item, rng1) = f(rng)
    g(item)(rng1)
}

fun nonNegativeLessThan2(n: Int): Rand<Int> =
    flatMap(::nonNegativeInt) { item: Int ->
        val mod = item % n
        
        if (item + (n - 1) - mod >= 0) {
            unit(mod)
        } else {
            nonNegativeLessThan2(n)
        }
    }

연습문제 6.9

fun <ITEM, RESULT> mapF(s: Rand<ITEM>, f: (ITEM) -> (RESULT)): Rand<RESULT> =
    flatMap(s) { item ->
        unit(f(item))
    }

fun <ITEM1, ITEM2, RESULT> map2F(ra: Rand<ITEM1>, rb: Rand<ITEM2>, f: (ITEM1, ITEM2) -> RESULT): Rand<RESULT> =
    flatMap(ra) { item1 ->
        mapF(rb) { item2 ->
            f(item1, item2)
        }
    }

일반적인 상태 동작 타입

지금까지 난수 생성에 대한 코드만 주구장창 봤다. 이거 어떻게 다른 문제에 적용할 수 있을까?

임의의 상태 동작에 대해 적용할 수 있는 타입과 함수로 만들어보자.

data class State<STATE, out ITEM>(val run: (STATE) -> Pair<ITEM, STATE>)

typealias Rand2<ITEM> = State<RNG, ITEM>

연습문제 6.10

fun <STATE, ITEM> unitS(item: ITEM): State<STATE, ITEM> = State { state ->
    item to state
}

fun <STATE, ITEM, RESULT> flatMapS(f: State<STATE, ITEM>, g: (ITEM) -> State<STATE, RESULT>): State<STATE, RESULT> =
   State { state -> 
       val (item, state1) = f.run(state)
       g(item).run((state1))
   }

fun <STATE, ITEM, RESULT> mapS(s: State<STATE, ITEM>, f: (ITEM) -> RESULT): State<STATE, RESULT> = 
    flatMapS(s) { item ->
        unitS(f(item))
    }

fun <STATE, ITEM1, ITEM2, RESULT> map2S(s1: State<STATE, ITEM1>, s2: State<STATE, ITEM2>, f: (ITEM1, ITEM2) -> RESULT): State<STATE, RESULT> =
    flatMapS(s1) { item1 ->
        mapS(s2) { item2 ->
            f(item1, item2)
        }
    }

fun <STATE, ITEM> sequenceS(fs: List<State<STATE, ITEM>>): State<STATE, List<ITEM>> = 
    foldRight(fs, unitS<STATE, List<ITEM>>(Nil)) { item, acc ->
        map2S(item, acc) { h, t ->
            Cons(h, t)
        }
    }

 

코틀린은 기본적으로 엄격한 평가를 수행합니다. 

엄격하다는 것이 무엇일까요?

함수를 호출하기 전에 함수 파라미터를 완전히 평가한다는 뜻입니다. 
엄격한 평가에서 식은 식이 변수에 바운드되는 순간에 평가됩니다. 

fun main() {
	get(1+2, 10.0.pow(300))
}

fun get(a: Int, b: Double) {
	//...
}

요런 코드가 있을 때, get 함수를 호출하는 시점에 1+2, 10.0.pow(300)은 평가됩니다. 

평가하는 과정이 너무 복잡하고 비싸면 어떻게 할까?

리스트에 모든 요소에 평가를 해야하는데, 실제로는 몇개의 요소만 사용한다면 이러한 연산은 비효율적입니다. 

지연 평가를 사용하면 이 비효율을 줄일 수 있습니다. 

지연 평가는 값을 선언한(바운딩된) 시점이 아니라 실제 참조하는 시점에 계산이 이루어집니다.
필요에 따라 값을 계산하는 것입니다.

코틀린의 비효율적인 코드

List.of(1, 2, 3, 4).map { it + 10 }.filter { it % 2 == 0 }.map { it * 3 }

map, filter, map의 각 단계 마다 중간 리스트를 만들어낸다. 다음 변환의 입력으로 쓰인 후 즉시 버려질 임시 리스트를 만들어내는 것이다. 

지연 연산을 통해 이런 변환의 시퀀스를 융합해 한 번의 처리 단계로 끝낼 수 있다. 

5.1 엄격한 함수와 엄격하지 않은 함수

엄격성과 비엄격성에 대해 알아보고, 엄격성에 대한 형식적 정의를 하겠습니다. 

엄격한 함수

  • 대부분 프로그래밍 언어의 표준
  • 항상 모든 인자를 평가

엄격하지 않은 함수(비엄격성)

  • 함수가 하나 이상의 인자를 평가하지 않기로 선택했다는 뜻

 

엄격한 함수

fun square(x: Double): Double = x * x

square(41.0 + 1.0)
square(exitProcess(-1))

이렇게 함수를 호출하면, 41.0 + 1.0을 계산해서 42.0을 전달받고,
square함수블럭에 들어가기 전에 exitProcess()가 호출되어 프로그램이 종료될 것입니다. 

이는 함수가 호출되기 전에 인자에 대해 평가가 일어난 것이고 이를 엄격한 함수라고 합니다. 

비엄격성의 예시

우리가 이미 쓰고있던 문법에도 비엄격성이 녹아있습니다. 

쇼트 서킷이라고 불리는 &&, || 같은 경우는 엄격하지 않습니다.
&&의 경우 첫 번째 인자가 true일 때만 두 번째 인자를 평가합니다. 
||의 경우 첫 번쨰 인자가 false일 때만 두 번째 인자를 평가합니다. 

또 if도 엄격하지 않습니다. 

val result = if (input.isEmpty()) exitProcess(-1) else input

if를 삼항연산자처럼 사용할 수 있습니다. 
여기서 조건식에 따라 참 거짓중 하나는 평가하지 않습니다. 모든 인자를 평가하지 않기 때문에 엄격하지 않다고 할 수 있습니다. 

다만 조건식에 대해서는 엄격하다고 할 수 있습니다. 

코틀린에서 엄격하지 않은 함수 작성하기

코틀린에서 엄격하지 않은 함수를 어떻게 작성할까요? 다른말로, 득정 인자를 평가하지 않겠다고 어떻게 표현할까요?

따로 표현하는 방법은 없고, () -> A 이러한 함수 타입을 사용해서 평가하지 않은 값을 표현합니다. 

fun maybeTwice(b: Boolean, i: () -> Int) =
	if (b) i() + i() else 0

이 함수에서 i 는 함수 타입이고, 이는 평가하지 않는 값입니다.. 

i를 보통 thunk라고 부르고, 함수 안에서 thunk를 실행하여 식을 평가한 결과를 얻을 수 있습니다. 

val x = maybeTwice(true) { 1 + 41 }

그런데 위 함수처럼 thunk를 여러번 실행한다면, 매번 평가가 일어납니다. 
i()를 호출할 때만다 1+41 연산이 수행된다는 것 입니다. 연산이 복잡하고 오래걸린다면 이 또한 골칫거리입니다. 

그래서 thunk를 한 번만 실행하고 값을 캐싱해서 사용하는 방법이 있습니다. 

fun maybeTwice2(b: Boolean, i: () -> Int) {
	val j: Int by lazy(i)
    if (b) j + j else 0
}

j라는 변수에 i의 결과를 캐싱했습니다. 
i를 실행하는 시점은 j를 참조하는 순간입니다. 이는 lazy 내장함수를 통해 가능합니다. 

fun maybeTwice3(b: Boolean, i: () -> Int) {
    val j: Int = lazy(i).value
    println("before if")
    if (b) j + j else 0
}

fun maybeTwice4(b: Boolean, i: () -> Int) {
    val j: Int = i()
    println("before if")
    if (b) j + j else 0
}

fun maybeTwice5(b: Boolean, i: () -> Int) {
    val j: Lazy<Int> = lazy(i)
    println("before if")
    if (b) j.value + j.value else 0
}

이런식으로도 작성할 수 있습니다. 

단, maybeTwice3, 4의 경우는 지연 초기화는 일어나지 않고 단순 캐싱만 일어납니다.
maybeTwice5같이 위임을 사용하지 않고도 지연 초기화를 할 수 있습니다. 

엄격성의 형식적인 정의

어떤 식을 평가했는데 결과를 반환하는 대신 영원히 실행되거나, 오류를 던지는 경우를 생각해봅시다.

fun maybe(value: Int) {
	//...
}

fun something1(): Int {
	while(true) {
    }
	return 0
}

fun something2(): Int {
	throw Exception()
    return 0;
}

maybe(something1())
maybe(something2())

이런 경우 일 것입니다. 이럴 떄 식이 종료되지 않는다는 뜻으로, "바텀으로 평가된다"라고 합니다.

엄격성이란.
어떤 식 x가 바텀으로 평가될 때 함수 f도 항상 바텀으로 평가된다면 이 함수 f를 엄격한 함수라고 말한다.

something1()과 something2()가 바텀으로 평가되는데, maybe(something1())도 바텀으로 평가되므로, maybe()는 엄격한 함수이다. 

5.2 확장 예제: 지연 리스트

List.of(1, 2, 3, 4).map { it + 10 }.filter { it % 2 == 0 }.map { it * 3 }

앞에서 이 예제를 얘기하며 엄격한 연산의 비효율성에 대해 얘기했었습니다.
각 단계를 수행하며 매번 새로운 리스트가 만들어져 비효율적이라는 게 문제였습니다. 
스트림을 사용하여 연쇄적인 변환이 지연 연산을 통해 하나의 단계로 융합하여 문제를 해결할 수 있습니다.

또한 지연 연산을 사용하면 함수형 프로그램의 효율을 높이고 모듈화를 촉진할 수 있습니다. 

sealed class Stream<out ITEM>

data class Cons<out ITEM>(
    val head: () -> ITEM,
    val tail: () -> Stream<ITEM>
) : Stream<ITEM>()

object Empty : Stream<Nothing>()

Stream이라는 새로운 자료형을 정의했습니다. 보기에 List와 비슷해보이지만, Cons의 생성자에 엄격한 값이 아니라 thunk를 받습니다.
2023.09.10 - [Programming/코틀린 함수형 프로그래밍] - 3. 함수형 데이터 구조 - Functional Programming in Kotlin

Stream을 관찰하거나 순회하려면, thunk를 강제로 평가해야 합니다. 

fun <ITEM> Stream<ITEM>.headOption(): Option<ITEM> =
    when(this) {
        is Empty -> None
        is Cons -> Some(head())
    }

Cons라면 Some(head())를 반환합니다. head()를 계산하는 과정이 있지만, 그 외에는 List와 똑같이 작동합니다. 

여기서는 tail()을 계산하지 않았습니다. 이런식으로 필요한 부분만 계산할 수 있는 것도 장점입니다. 

5.2.1 스트림을 메모이제이션해서 재계산 피하기

복잡한 계산은 피하는게 좋다. 메모이제이션은 어떤 식을 최초로 평가한 결과를 캐시에 넣고, 다음부터는 재사용하는 것이다. 

Cons 노드에 thunk를 강제 계산하고 나면 캐싱을 해두고 싶다. 하지만 지금 구조에서는 그렇지 않다.

val x = Cons({ println("cons"); 1}, { Empty })
val h1 = x.headOption()
val h2 = x.headOption()

이 코드에서는 head thunk를 2번 평가한다. 

메모이제이션을 하려면 일반적으로 스마트 생성자를 사용한다. 

data class Cons<out ITEM>(
    val head: () -> ITEM,
    val tail: () -> Stream<ITEM>
) : Stream<ITEM>() {
    companion object {
        fun <ITEM> cons(head: () -> ITEM, tail: () -> Stream<ITEM>): Stream<ITEM> {
            val head: ITEM by lazy(head)
            val tail: Stream<ITEM> by lazy(tail)
            return Cons({ head }, { tail })
        }
    }
}

관습적으로 클래스의 companion object에 위치하고, 클래스 이름의 첫글자를 소문자로 바꿔 함수명을 짓는다.
이 스마트 생성자를 통해 객체를 생성하면 head, tail이 변수에 캐싱되어, 처음 thunk가 실행될 때 한 번만 평가한다. 

sealed class Stream<out ITEM> {
    companion object {
        fun <ITEM> of(vararg xs: ITEM): Stream<ITEM> =
            if (xs.isEmpty()) Empty.empty()
            else Cons.cons({ xs[0] }, { of(*xs.sliceArray(1..<xs.size)) })
    }
}

object Empty : Stream<Nothing>() {
    fun <ITEM> empty(): Stream<ITEM> = Empty
}

Empty에 대한 스마트 생성자도 만들고, Stream을 of를 통해 생성할 수 있도록 만들었다.

5.2.2 스트림 관찰을 위한 도우미 함수

연습문제 5.1 ~ 3

fun <ITEM> Stream<ITEM>.toList(): List<ITEM> =
    when (this) {
        is Empty -> Nil
        is Cons -> funfun.study.kfp.chap3.Cons(head(), tail().toList())
    }
    
fun <ITEM> Stream<ITEM>.take(n: Int): Stream<ITEM> =
    if (n == 0) Empty
    else when (this) {
        is Empty -> Empty
        is Cons -> Cons(head, { tail().take(n - 1) })
    }

fun <ITEM> Stream<ITEM>.drop(n: Int): Stream<ITEM> =
    if (n == 0) this
    else when (this) {
        is Empty -> Empty
        is Cons -> tail().drop(n - 1)
    }
    
fun <ITEM> Stream<ITEM>.takeWhile(p: (ITEM) -> Boolean): Stream<ITEM> =
    when (this) {
        is Empty -> Empty
        is Cons -> if (p(head())) Cons(head, { tail().takeWhile(p) }) else Empty
    }

5.3 프로그램 기술과 평가 분리하기

앞에서부터 계속 관심사 분리를 해왔다. 

어떤 계산을 할지와 계산을 수행하는 것을 분리하고, 오류에 대한 포획과 오류에 대한 처리를 분리했다. 

지연 계산을 사용하면 식에 대한 기술(description)과 식에 대한 평가를 분리할 수 있다.

비용이 큰 커다란 식을 기술하고 일부만 평가하도록 선택하는 강력한 능력을 가질 수 있다. 

fun <ITEM> Stream<ITEM>.exists(p: (ITEM) -> Boolean): Boolean =
    when (this) {
        is Cons -> p(head()) || this.tail().exists(p)
        else -> false
    }

exists함수는 stream안에 어떤 Boolean 함수를 만족하는 원소가 있는지 검사한다.

여기서 ||는 두 번째 인자에 대해 엄격하지 않다. p(head())가 true라면 뒤는 평가하지 않는다. 그렇다면 또한 tail 역시 평가하지 않는다는 것이다 

fun <ITEM, RESULT> Stream<ITEM>.foldRight(
    default: () -> RESULT,
    combine: (ITEM, () -> RESULT) -> RESULT
): RESULT =
    when (this) {
        is Cons -> combine(head(), { tail().foldRight(default, combine) })
        is Empty -> default()
    }

fun <ITEM> Stream<ITEM>.exists2(p: (ITEM) -> Boolean): Boolean =
    foldRight({ false }, { a, b -> p(a) || b() })

명시적 재귀를 사용하지 않고, foldRight를 사용하여 재귀를 구현할 수도 있다.

연습문제 5.4 ~ 7

fun <ITEM> Stream<ITEM>.forAll(condition: (ITEM) -> Boolean): Boolean =
    when (this) {
        is Cons -> if (condition(head())) tail().forAll(condition) else false
        is Empty -> true
    }

fun <ITEM> Stream<ITEM>.forAll2(condition: (ITEM) -> Boolean): Boolean =
    foldRight({ true }, { head, tail -> condition(head) && tail() })



fun <ITEM> Stream<ITEM>.takeWhile2(p: (ITEM) -> Boolean): Stream<ITEM> =
    foldRight(
        { Empty },
        { head, tail: () -> Stream<ITEM> ->
            if (p(head)) Cons({ head }, { tail().takeWhile2(p) }) else Empty
        }
    )

fun <ITEM> Stream<ITEM>.headOption2(): Option<ITEM> =
    foldRight({ None }, { head, tail: () -> Option<ITEM> ->
        Some(head)
    })


fun <ITEM, RESULT> Stream<ITEM>.map(f: (ITEM) -> RESULT): Stream<RESULT> =
    foldRight({ Empty.empty() }, { head, tail: () -> Stream<RESULT> ->
        Cons({ f(head) }, tail)
    })

fun <ITEM> Stream<ITEM>.filter(f: (ITEM) -> Boolean): Stream<ITEM> =
    foldRight({ Empty.empty() }) { h, t ->
        if (f(h)) Cons({ h }, t) else t()
    }

fun <ITEM> Stream<ITEM>.append(sa: () -> Stream<ITEM>): Stream<ITEM> =
    foldRight(sa) { h, t ->
        Cons({ h }, t)
    }

5.4 공재귀 함수를 통해 무한한 데이터 스트림 생성하기

지금까지 작성한 함수들은 무한 스트림에 대해서도 작동한다. 
무한 스트림도 점진적으로 계산이 이뤄지기 때문이다.

fun ones(): Stream<Int> = Cons.cons({ 1 }, { ones() })

무한 시퀀스를 생성하는 코드이다. 

ones().take(5).toList()
ones().exists { it % 2 != 0 }
ones().map { it + 1 }.exists { it % 2 == 0 }
ones().takeWhile { it == 1 }

이런 연산들을 그대로 사용할 수 있다. 

하지만 아래와 같은 연산들은 stack overflow가 날 수 도 있다. 무한 스트림을 영원히 검사하기 때문이다. 

ones().forAll2 { it == 1 }
ones().takeWhile { it == 1 }.toList()
ones().exists { it % 2 == 0 }

연습문제

5.8

fun <ITEM> constant(item: ITEM): Stream<ITEM> = Cons.cons({ item }, { constant(item) })

5.9

fun from(n: Int): Stream<Int> = Cons.cons({ n }, { from(n + 1) })

5.10

fun fibs(): Stream<Int> {
    fun go(a: Int, b: Int): Stream<Int> {
        return Cons.cons({ a }, { go(b, a + b) })
    }
    return go(0, 1)
}

5.11

fun <A, S> unfold(z: S, f: (S) -> Option<Pair<A, S>>): Stream<A> =
    when (val s = f(z)) {
        is None -> Empty
        is Some -> {
            val (cur, next) = s.get
            Cons.cons({ cur }, { unfold(next, f) })
        }
    }

5.12

fun ones2(): Stream<Int> = unfold(1) {
    Some(1 to 1)
}

fun <A> constant2(a: A): Stream<A> =
    unfold(a) {
        Some(a to a)
    }

fun from2(n: Int): Stream<Int> =
    unfold(n) {
        Some(it to it + 1)
    }

fun fibs2(): Stream<Int> =
    unfold(0 to 1) { (cur, next) ->
        Some(cur to (next to (cur + next)))
    }

5.13

fun <ITEM, RESULT> Stream<ITEM>.map2(f: (ITEM) -> RESULT): Stream<RESULT> =
    unfold(this) {
        when (it) {
            is Empty -> None
            is Cons -> {
                Some(f(it.head()) to it.tail())
            }
        }
    }

fun <ITEM> Stream<ITEM>.take2(n: Int): Stream<ITEM> =
    unfold(n to this) { (num, item) ->
        if (num == 0) None
        else when (item) {
            is Empty -> None
            is Cons -> Some(item.head() to (num - 1 to item.tail()))
        }
    }

fun <ITEM> Stream<ITEM>.take3(n: Int): Stream<ITEM> =
    unfold(this) { item ->
        when (item) {
            is Empty -> None
            is Cons -> {
                if (n == 0) None
                else Some(item.head() to item.tail().take3(n - 1))
            }
        }
    }

fun <ITEM> Stream<ITEM>.takeWhile3(p: (ITEM) -> Boolean): Stream<ITEM> =
    unfold(this) { item ->
        when (item) {
            is Empty -> None
            is Cons ->
                if (p(item.head())) Some(item.head() to item.tail())
                else None
        }
    }

fun <ITEM1, ITEM2, RESULT> Stream<ITEM1>.zipWith(
    that: Stream<ITEM2>,
    combine: (ITEM1, ITEM2) -> RESULT
): Stream<RESULT> =
    unfold(this to that) { (item1, item2) ->
        when (item1) {
            is Empty -> None
            is Cons -> when (item2) {
                is Empty -> None
                is Cons -> Some(combine(item1.head(), item2.head()) to (item1.tail() to item2.tail()))
            }
        }
    }

fun <ITEM1, ITEM2> Stream<ITEM1>.zipAll(that: Stream<ITEM2>): Stream<Pair<Option<ITEM1>, Option<ITEM2>>> =
    unfold(this to that) { (item1, item2) ->
        when (item1) {
            is Empty -> when (item2) {
                is Empty -> None
                is Cons -> Some((None to Some(item2.head())) to (Empty.empty<ITEM1>() to item2.tail()))
            }

            is Cons -> when (item2) {
                is Empty -> Some((Some(item1.head()) to None) to (item1.tail() to Empty.empty()))
                is Cons -> Some((Some(item1.head()) to Some(item2.head())) to (item1.tail() to item2.tail()))
            }
        }
    }

5.14

fun <ITEM> Stream<ITEM>.startsWith(that: Stream<ITEM>): Boolean =
    this.zipWith(that) { item1, item2 ->
        item1 == item2
    }.forAll { it }

5.15

fun <ITEM> Stream<ITEM>.tails(): Stream<Stream<ITEM>> =
    unfold(this) { item ->
        when(item) {
            is Cons -> Some(item to item.tail())
            is Empty -> None
        }
    }

Fold 란?

fold란 고차 함수중 하나입니다.
재귀적으로 정의된 자료구조를 분석하고, 전달받은 명령을 사용하여 재결합하고, 재귀적으로 수행된 결과들로 반환값을 만들어내는 연산입니다. 

Fold (고차 함수) - 위키백과, 우리 모두의 백과사전 (wikipedia.org)

 

하나하나 의미를 살펴봅시다.

 

재귀적으로 정의된 자료구조
sealed class List<out A> {
    companion object {
        fun <A> of(vararg aa: A): List<A> {
            val tail = aa.sliceArray(1..<aa.size)
            return if (aa.isEmpty()) Nil else Cons(aa[0], of(*tail))
        }
    }
}
object Nil : List<Nothing>()
data class Cons<out A>(val head: A, val tail: List<A>) : List<A>()

Cons는 멤버변수로 List 타입을 갖기 때문에 재귀적인 자료구조라고 할 수 있습니다.

전달받은 명령을 사용하여 재결합
fun <A, B> foldRight(xs: List<A>, z: B, f: (A, B) -> B): B =
    when (xs) {
        is Nil -> z
        is Cons -> f(xs.head, foldRight(xs.tail, z, f))
    }

fun main() {
    val list = Cons(3, Cons(6, Cons(9, Nil)))
    foldRight(list, 0) { a, b ->
        a + b
    }
}

foldRight함수에 명령으로 { a, b -> a + b } 람다를 전달했습니다. 
foldRight안에서는 f(xs.head, foldRight(xs.tail, z, f)) 를 통해 재결합 하고 있습니다. 

재귀적으로 수행된 결과들로 반환값을 만들어 낸다
is Cons -> f(xs.head, foldRight(xs.tail, z, f))

f연산은 { a, b -> a + b }입니다. 

xs.head + foldRight(xs.tail, z, f) 를 결과로 반환합니다.

xs.head + (xs.tail.head + foldRight(xs.tail.tail, z, f))
xs.head + (xs.tail.head + (xs.tail.tail.head + foldRight(xs.tail.tail.tail, z, f))) ...
이렇게 재귀적으로 계산되다가 Nil 조건을 만나면 반환값을 반들어냅니다. 

 

Fold 방향

fold는 "재귀적으로 수행" 을 어떤 방향으로 적용하는 지에 따라 FoldLeft와 FoldRight가 나뉘어집니다.

  • FoldLeft 
    • 요소의 앞에서 부터 계산
    • (((1 + 2) + 3) + 4) +5
  • FoldRight 
    • 요소의 뒤에서 부터 계산
    • 1 + (2 + (3 + (4 + 5)))

FoldLeft

sealed class List2<out A> {
    companion object {
        fun <A> of(vararg aa: A): List2<A> {
            val heads = aa.sliceArray(0..<aa.size - 1)
            return if (aa.isEmpty()) Nil2 else Cons2(of(*heads), aa.last())
        }
    }
}
object Nil2 : List2<Nothing>()
data class Cons2<out A>(val heads: List2<A>, val last: A) : List2<A>()

fun <ITEM1, ITEM2> foldLeft(list: List2<ITEM1>, defaultValue: ITEM2, combine: (ITEM2, ITEM1) -> ITEM2): ITEM2 =
    when (list) {
        is Nil2 -> defaultValue
        is Cons2 -> combine(foldLeft(list, defaultValue, combine), list.last)

    }

foldLeft를 구현하기위해 List2 자료구조를 새로 만들었습니다. 

// List
val list = Cons(3, Cons(6, Cons(9, Nil)))


// List2
val list1 = Cons2(Cons2(Cons2(Nil2, 1), 2), 3)

리스트를 정의할 때 차이점이 있습니다. 

첫번째 요소에 도달할 때 까지 재귀적으로 foldLeft를 호출하게 됩니다. 

List2를 새로 만들지 않고, foldLeft를 구현할 수 도 있습니다. 

tailrec fun <ITEM1, ITEM2> foldLeft(list: List<ITEM1>, defaultValue: ITEM2, combine: (ITEM2, ITEM1) -> ITEM2): ITEM2 =
    when(list) {
        is Nil -> defaultValue
        is Cons -> foldLeft2(list.tail, combine(defaultValue, list.head), combine)
    }

첫 번째 요소부터, defaultValue에 누적하면서 마지막요소까지 계산해갑니다

위의 List2의 예시는 틀렸습니다. 
앞에서 부터 계산하는지와 뒤에서 계산하는지와는 별개로 재귀적 호출된 함수의 결과를 언제 조합하냐가 핵심입니다. 

먼저 계산한 후 다음 재귀 호출을 하면, Left
모든 재귀호출이 끝난 후 콜스택을 돌아오면서 계산을 마치면 Right입니다. 

The laboratory of the wizard :: 재귀호출의 구분과 foldLeft/foldRight (tistory.com)

 

FoldRight

fun <ITEM1, ITEM2> foldRight(list: List<ITEM1>, defaultValue: ITEM2, combine: (ITEM1, ITEM2) -> ITEM2): ITEM2 =
    when (list) {
        is Nil -> defaultValue
        is Cons -> combine(list.head, foldRight(list.tail, defaultValue, combine))
    }

foldRight는 위에서 봤던 정의와 동일합니다. 

마지막 요소를 만날 때까지 재귀적으로 호출되다가, 다시 돌아오면서 combine되는 형식입니다. 

 

특징

꼬리 재귀

foldLeft는 꼬리 재귀로 만들 수 있지만, foldRight는 꼬리 재귀로 만들 수 없다. 

 

결합 법칙 성립하지 않는 연산

  • FoldLeft 
    • 요소의 앞에서 부터 계산
    • (((1 + 2) + 3) + 4) +5
  • FoldRight 
    • 요소의 뒤에서 부터 계산
    • 1 + (2 + (3 + (4 + 5)))

 

위에서 이런 정의를 했었습니다.
이 정의에서는 + 연산을 하였기 때문에 foldLeft, foldRight 모두 같은 결과가 나왔습니다. 

+연산은 결합법칙이 적용되기 때문인데, 결합 법칙이 적용되지 않는 ÷ 연산은 어떨까요?

4 -> 2 -> 2 리스트가 있다고 해보겠습니다. 

  • foldLeft : ((4 / 2) / 2) = 1
  • foldRight : (4 / (2 / 2)) = 4

이렇게 결합 법칙이 적용되지 않는 연산은 foldLeft와 foldRight의 결과가 다를 수 있습니다. 

FoldLeft와 FoldRight 제대로 알고 사용하기 | leeyh0216's devlog

+ Recent posts