엄코딩의 개발 일지

Channels ( experimental )

 

지연된 값은 코루틴간에 단일 값을 전송하는 편리한 방법을 제공합니다. Channels는 값의 흐름을 전송하는 방법을 제공합니다.

 

Channels are an experimental feature of kotlinx.coroutines. 
Their API is expected to evolve in the upcoming updates of the kotlinx.coroutines library with potentially breaking changes.

 

Channel basics

 

Channel 은 개념적으로 BlockingQueue 와 매우 유사합니다. 한가지 중요한 차이점은 blocking put 연산 대신에 suspending send, 그리고 blocking take 연산 대신에 suspending receive를 갖습니다.

 

fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
        // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
        for (x in 1..5) channel.send(x * x)
    }
    // here we print five received integers:
    repeat(5) { println(channel.receive()) }
    println("Done!")
}

 

출력 결과

 

1
4
9
16
25
Done!

 

Closing and iteration over channels

 

queue와 다르게 채널은 더 이상 요소가 없다는 것은 나타낼 수 있습니다. receiver 측에서 for루프를 사용하여 채널로부터 요소들을 편리하게 받을 수 있습니다. 

 

개념적으로, close 는 특별한 close 토큰을 채널에 보내는 것과 같습니다. close 토큰이 수신되는 즉시 반복이 중지되므로 이전에 전송된 모든 요소가 수신된다고 보장할 수 있습니다.

 

fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
        for (x in 1..5) channel.send(x * x)
        channel.close() // we're done sending
    }
    // here we print received values using `for` loop (until the channel is closed)
    for (y in channel) println(y)
    println("Done!")
}

 

출력 결과

 

1
4
9
16
25
Done!

 

Building channel producers

 

코루틴이 일련의 요소를 생산하는 것은 매우 일반적입니다. 이는 동기 코드에서 종종 발견할 수 있는 producer - consumer 코드의 일부분입니다. producer 를 매개 변수로 사용하는 함수로 추상화 할 수 있지만, 결과가 함수에서 return 되어야 한다는 일반적 상식에 어긋납니다.

편리한 코루틴 빌더 produce 를 통해서 producer 측에서 이를 쉽게할 수 있습니다. 그리고 consumer 측 for loop를 대신할 확장 함수 consumeEach 가 있습니다.

 

fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
    for (x in 1..5) send(x * x)
}

fun main() = runBlocking {
    val squares = produceSquares()
    squares.consumeEach { println(it) }
    println("Done!")
}

 

출력 결과

 

1
4
9
16
25
Done!

 

Pipelines

 

파이프라인은 하나의 코루틴이 무한의 값 스트림을 생성하는 패턴입니다.

 

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // infinite stream of integers starting from 1
}

 

다른 코루틴 또는 코루틴들에서는 이 스트림을 소비하고, 일부 처리를 수행합니다. 그리고 다른 결과를 생산합니다. 아래 예에서 숫자는 단지 제곱입니다.

 

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (x in numbers) send(x * x)
}

 

main code는 전체 파이프라인을 연결하고 시작합니다.

 

fun main() = runBlocking {
    val numbers = produceNumbers() // produces integers from 1 and on
    val squares = square(numbers) // squares integers
    for (i in 1..5) println(squares.receive()) // print first five
    println("Done!") // we are done
    coroutineContext.cancelChildren() // cancel children coroutines
}

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // infinite stream of integers starting from 1
}

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (x in numbers) send(x * x)
}

 

출력 결과

 

1
4
9
16
25
Done!

 

코루틴을 만드는 모든 함수들은 CoroutineScope의 확장으로 정의 되므로, 구조화된 동시성에 의존하여, 어플리케이션에 느린 전역 코루틴이 없는지 확인할 수 있습니다.

 

Prime numbers with pipeline

 

코루틴의 파이프라인을 사용하여 소수를 생성하는 예를 설명해 보겠습니다.

 

fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
    var x = start
    while (true) send(x++) // infinite stream of integers from start
}

 

다음으로 파이프라인에 들어오는 스트림을 필터링하여 소수면 send 합니다.

 

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) if (x % prime != 0) send(x)
}

 

이제 2에서 숫자의 흐름을 시작하고, 현재 채널로부터 소수를 가져옵니다. 그리고 각 소수에 대해 파이프라인 단계를 시작하여 새로운 파이프라인을 만듭니다.

 

numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ... 

 

다음 예제는 메인스레드의 context에서 전체 파이프라인을 실행하는 10개의 소수를 출력합니다.

모든 코루틴은 메인 runBlocking 코루틴의 범위에서 시작되므로, 모든 코루틴의 목록을 명시할 필요는 없습니다.

10개의 소수를 출력하고 cancelChildren 확장 함수를 사용하여 모든 children 코루틴을 취소합니다.

 

fun main() = runBlocking {
    var cur = numbersFrom(2)
    for (i in 1..10) {
        val prime = cur.receive()
        println(prime)
        cur = filter(cur, prime)
    }
    coroutineContext.cancelChildren() // cancel all children to let main finish    
}

fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
    var x = start
    while (true) send(x++) // infinite stream of integers from start
}

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) if (x % prime != 0) send(x)
}

 

출력 결과

 

2
3
5
7
11
13
17
19
23
29

 

코루틴 빌더의 iterator 를 사용하여 동일한 파이프라인을 만들 수 있습니다.

produce를 iterator로 send를 yield로 receive를 next로 ReceiveChannel을 Iterator로 대체해보세요. 그리고 Coroutine scope를 제거해보세요. 더이상 runBlocking가 필요하지 않을 것입니다. 그러나, 채널을 사용하는 파이프라인의 이점은 Dispatchers.Default context에서 여러 CPU 코어를 사용할 수 있다는 것입니다.

 

어쩃든, 위처럼 소수를 찾는 방법은 극단적인 케이스로, 실용적이지 못합니다. 실제 파이프라인은 다른 suspending invocations ( 원격 서비스에 대한 비동기 호출 ) 을 포함하지만, 파이프라인은 비동기적인 produce와는 다르게 임의의 suspension을 허용하지 않기 때문에 sequence / iterator을 사용하여 만들 수 없습니다.

 

 

Fan-out

 

 

여러개의 코루틴은 동일한 채널에서 수신할 수 있으며 상호간에 작업을 분배할 수 있습니다. 주기적으로 정수를 생성하는 생산자 코루틴을 예로 들어보겠습니다.

 

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) // wait 0.1s
    }
}

 

그리고 여러개의 프로세서 코루틴을 가질 수 있습니다. 이 예에서는 id와 숫자만을 출력합니다.

 

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("Processor #$id received $msg")
    }    
}

 

5개의 프로세서를 launch 하고, 약 1초동안 작동시켜보겠습니다.

 

val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // cancel producer coroutine and thus kill them all

 

출력 결과

 

Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10

 

출력 결과를 보면 알 수 있듯이, 비록 프로세서 아이디는 다를 수 있으나 number 값은 위와 동일할 것입니다.

producer 코루틴을 취소하면 채널이 닫히고 결국 프로세서 코루틴이 수행하는 채널에서 종료될 것입니다.

consumeEach 와는 다르게 for 루프 패턴은 다수의 코루틴에서 안전합니다. 만약 하나의 프로세서 코루틴이 실패하면, 다른 코루틴은 여전히 채널을 처리합니다. 반면에, consumeEach 로 작성된 프로세서는 정상 또는 비정상 완료시 항상 채널을 consumes ( 취소 ) 합니다. 

 

Fan-in

 

여러개의 코루틴이 동일한 채널로 전송할 수 있습니다. 예를 들어, 문자열 채널과, 특정 delay 를 지정하여 문자열을 채널에 반복적으로 보내는 함수를 보겠습니다.

 

fun main() = runBlocking {
    val channel = Channel<String>()
    launch { sendString(channel, "foo", 200L) }
    launch { sendString(channel, "BAR!", 500L) }
    repeat(6) { // receive first six
        println(channel.receive())
    }
    coroutineContext.cancelChildren() // cancel all children to let main finish
}

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}

 

출력 결과

 

foo
foo
BAR!
foo
foo
BAR!

 

 

Buffered channels

 

지금까지 보았던 채널에는 버퍼가 없었습니다. 버퍼링되지 않은 채널은 발신자와 수신자가 서로 만날 때 값을 전송합니다.

send 가 먼저 호출되면 receive 가 호출 될 때까지 일시 중단되고, receive가 먼저 호출되면 send 가 호출 될 때까지 일시 중단됩니다.

Channel() factory function 그리고 produce 빌더는 선택적으로 버퍼의 크기를 구체화하는 용량을 파라미터로 갖습니다.

버퍼는 발신자에게 일시 중단되기 전까지( BlockingQueue와 유사하게 버퍼가 꽉차기 전까지 ) 다수의 요소를 보내는 것을 허락합니다.

 

fun main() = runBlocking<Unit> {
    val channel = Channel<Int>(4) // create buffered channel
    val sender = launch { // launch sender coroutine
        repeat(10) {
            println("Sending $it") // print before sending each element
            channel.send(it) // will suspend when buffer is full
        }
    }
    // don't receive anything... just wait....
    delay(1000)
    sender.cancel() // cancel sender coroutine    
}

 

출력 결과

 

Sending 0
Sending 1
Sending 2
Sending 3
Sending 4

 

처음 요소 4개까지는 버퍼에 추가되고 다섯번째 요소를 보내려고 할 때 일시 중단됩니다.

 

Channels are fair

여러 코루틴에서 호출 한 순서와 관련하여 채널에 작업을 보내고 받는 것은 공정합니다. 그 순서는 First-In First-Out 입니다.

즉 호출할 첫번째 코루틴 receive 요소를 가져옵니다. 다음 예제에서는 두개의 코루틴 "ping" "pong" 는 공유 채널인 table 에서 ball 객체를 수신합니다.

 

data class Ball(var hits: Int)

fun main() = runBlocking {
    val table = Channel<Ball>() // a shared table
    launch { player("ping", table) }
    launch { player("pong", table) }
    table.send(Ball(0)) // serve the ball
    delay(1000) // delay 1 second
    coroutineContext.cancelChildren() // game over, cancel them
}

suspend fun player(name: String, table: Channel<Ball>) {
    for (ball in table) { // receive the ball in a loop
        ball.hits++
        println("$name $ball")
        delay(300) // wait a bit
        table.send(ball) // send the ball back
    }
}

 

출력 결과

 

ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)

 

때떄로 채널은 실행 프로그램의 특성으로 불공정한 실행을 처리할 수 있습니다.

 

Ticker channels

 

Ticker channel은 채널에서 마지막으로 소비된 이후 지연이 될 때마다 생성되는 특별한 rendezvous 채널 입니다.

쓸데없는 독립형으로 보일 수 있지만, 복잡한 시간 기반 produce 파이프라인 그리고 windowing , 시간 종속 처리를 수행하는 연산을 만드는데 유용합니다. Ticker channel select 에서 사용될 수 있습니다.

 

이러한 채널을 만들기 위해서는 factory method tciker 을 사용해야합니다. 더이상 필요 요소가 없다는 것을 나타내려면 ReceiveChannel.cancel 메소드를 사용하면 됩니다.

 

fun main() = runBlocking<Unit> {
    val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
    var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet

    nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
    println("Next element is not ready in 50 ms: $nextElement")

    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
    println("Next element is ready in 100 ms: $nextElement")

    // Emulate large consumption delays
    println("Consumer pauses for 150ms")
    delay(150)
    // Next element is available immediately
    nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Next element is available immediately after large consumer delay: $nextElement")
    // Note that the pause between `receive` calls is taken into account and next element arrives faster
    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } 
    println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")

    tickerChannel.cancel() // indicate that no more elements are needed
}

 

출력 결과

 

Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit