본문 바로가기
java/thread(동시성)

스레드 block 관점에서 Webflux Map vs Flatmap 비교

by 권성호 2023. 10. 2.

 

사실이 아니라 공부한 내용과 생각을 정리한 글입니다. 언제든 가르침을 주신다면 감사하겠습니다

서론

흔히들 webflux는 thread pool 기반의 모델과 달리 event loop 기반 모델을 사용하여 높은 처리량을 얻을 수 있다고 말한다.

위 말은 event loop 가 thread pool 기반의 모델보다 스레드를 더 효율적으로 사용한다는 것을 내포하고 있다.

즉 thread pool 기반의 모델은 io waiting 이 발생하면 thread 가 block 되는 반면, event loop 기반의 모델은 io waiting 이 발생하면 thread 가 block 되지 않고 다시 스레드풀에 반환되어 다른 처리를 할 수 있는 상태가 된다.

하지만 단순히 webflux 를 사용한다고 해서 무조건 thread를 효율적으로 사용할 수 있는 것은 아니다.

webflux를 통해 작성된 코드가 전부 blocking 지점 없이 reactive stack으로 구성되어야 비로소 높은 처리량을 확보할 수 있다.

이번 글을 통해 webflux 에서 제공하는 map과 flatmap을 비교해 보면서 기본동작을 이해하고, 나아가 webflux를 사용할 때 fully reactive stack으로 구성해야 하는 이유에 대해서도 공감할 수 있으면 좋겠다.

(예시코드는 https://github.com/rnjstjdgh/api-call-test 에 있습니다.)

테스트를 위한 Mock api server 

@RestController
class TestController {
    
    @GetMapping("/test/{sleep}/{idx}")
    fun test(
        @PathVariable("sleep") sleep: Long,
        @PathVariable("idx") idx: Long,
    ): ResponseEntity<String> {
        Thread.sleep(sleep)
        return ResponseEntity.ok("result${idx}")
    }
}

sleep millisecond 가 걸리는 외부 api를 제공하기 위한 mock api server이다.

api 요청 path에 sleep과 idx를 path variable로 넘겨주면 sleep millisecond 만큼 대기하다가 "result${idx}"를 응답하는 api를 제공한다.

해당 api endpoint를 제공하는 서버를 localhost:8081로 기동 한다.

 

FeignService

@Service
@FeignClient(
    contextId = "testFeignService",
    name = "test",
    url = "\${test.api.url}",	// localhost:8081 로 설정
    configuration = [FeignConfig::class]
)
interface FeignService {

    @GetMapping("/test/{sleep}/{idx}")
    fun testCall(
        @PathVariable("sleep") sleep: Long,
        @PathVariable("idx") idx: Long,
    ): String
}

mock api server를 호출하기 위한 feign client를 생성한다. 

feign는 동기식으로 api를 호출한다.

 

WebClientConfig

@Configuration
class WebClientConfig {

    companion object {
        val THREADS = 1
        val THREADFACTORY: BasicThreadFactory = BasicThreadFactory.Builder()
            .namingPattern("HttpThread-%d")
            .daemon(true)
            .priority(Thread.MAX_PRIORITY)
            .build()

        val EXECUTOR = ThreadPoolExecutor(
            THREADS,
            THREADS,
            0L,
            TimeUnit.MILLISECONDS,
            LinkedBlockingQueue(),
            THREADFACTORY,
            ThreadPoolExecutor.AbortPolicy()
        )

        val RESOURCE = NioEventLoopGroup(THREADS, EXECUTOR)
    }

    @Bean(name = ["webClient"])
    fun buildWebClient(): WebClient {
        val provider = ConnectionProvider.builder("custom-provider")
            .maxConnections(100)
            .maxIdleTime(Duration.ofSeconds(58))
            .maxLifeTime(Duration.ofSeconds(58))
            .pendingAcquireTimeout(Duration.ofMillis(5000))
            .pendingAcquireMaxCount(-1)
            .evictInBackground(Duration.ofSeconds(30))
            .lifo()
            .metrics(false)
            .build()

        val reactorResourceFactory = ReactorResourceFactory().apply {
            connectionProvider = provider
            loopResources = LoopResources { RESOURCE }
            isUseGlobalResources = false
        }

        return WebClient.builder()
            .baseUrl("http://localhost:8081")
            .clientConnector(ReactorClientHttpConnector(reactorResourceFactory) { httpClient ->
                httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                    .responseTimeout(Duration.ofMillis(5000))
                    .doOnConnected { conn ->
                        conn.addHandlerLast(ReadTimeoutHandler(5000, TimeUnit.MILLISECONDS))
                            .addHandlerLast(WriteTimeoutHandler(5000, TimeUnit.MILLISECONDS))
                    }
            })
            .build()
    }
}

WebClient 빈을 만드는 코드다.

코드가 다소 복잡할 수 있지만 핵심적인 부분만 차분히 살펴보자.

val THREADS = 1
val THREADFACTORY: BasicThreadFactory = BasicThreadFactory.Builder()
    .namingPattern("HttpThread-%d")
    .daemon(true)
    .priority(Thread.MAX_PRIORITY)
    .build()

val EXECUTOR = ThreadPoolExecutor(
    THREADS,
    THREADS,
    0L,
    TimeUnit.MILLISECONDS,
    LinkedBlockingQueue(),
    THREADFACTORY,
    ThreadPoolExecutor.AbortPolicy()
)

val RESOURCE = NioEventLoopGroup(THREADS, EXECUTOR)

webClient를 만들 때 사용할 이벤트 그룹을 생성하는 코드인데, Executor를 넣어주고 있다.

Executor는 스레드풀을 지정할 수 있고 해당 스레드풀의 스레드 개수는 1개로 세팅되어 있다.

즉, webclient를 통한 api 호출을 위해 사용되는 스레드는 1개로 지정되어 있다.

 

MapFlatmapTestController

@RestController
class MapFlatMapTestController(
        private val feignService: FeignService,
        private val webClient: WebClient
) {

    @GetMapping("/test/feign/map/{sleep}")
    fun `feign_map`(
            @PathVariable("sleep") sleep: Long,
    ): ResponseEntity<List<String>> {
        println("호출전: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")

        val resultList = Flux.range(1,10)
                .map {
                    feignService.testCall(sleep, it.toLong())
                }
                .collectList().block()

        println("호출후: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")
        return ResponseEntity.ok(resultList)
    }

    @GetMapping("/test/feign/flatmap/{sleep}")
    fun `feign_flatmap`(
            @PathVariable("sleep") sleep: Long,
    ): ResponseEntity<List<String>> {
        println("호출전: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")

        val resultList = Flux.range(1,10)
                .flatMap {
                    Mono.just(feignService.testCall(sleep, it.toLong()))
                }
                .collectList().block()

        println("호출후: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")
        return ResponseEntity.ok(resultList)
    }

    @GetMapping("/test/webclient/map/{sleep}")
    fun `webclient_map`(
            @PathVariable("sleep") sleep: Long,
    ): ResponseEntity<List<String?>> {
        println("호출전: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")
        val resultList = Flux.range(1,10)
                .map {
                    webClientCall(sleep, it).block()
                }
                .collectList().block()

        println("호출후: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")
        return ResponseEntity.ok(resultList)
    }

    @GetMapping("/test/webclient/flatmap/{sleep}")
    fun `webclient_flatmap`(
            @PathVariable("sleep") sleep: Long,
    ): ResponseEntity<List<String?>> {
        println("호출전: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")

        val resultList = Flux.range(1,10)
                .flatMap {
                    webClientCall(sleep, it)
                }
                .collectList().block()

        println("호출후: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")
        return ResponseEntity.ok(resultList)
    }

    private fun webClientCall(
            sleep: Long,
            idx: Int,
    ): Mono<String?> {
        return webClient.get()
                .uri("/test/$sleep/$idx")
                .retrieve()
                .bodyToMono(String::class.java)
    }
}

위에서 만든 FeignService, WebClient를 가지고 mock api server를 다양한 방법으로 호출하는 컨트롤러이다.

이제 이 컨트롤러의 각각의 endpoint를 하나하나 살펴보면서 map과 flatmap의 동작을 비교해 보고 실제로 스레드가 어떻게 사용되는지 살펴보자

 

1. feign + map

@GetMapping("/test/feign/map/{sleep}")
fun `feign_map`(
        @PathVariable("sleep") sleep: Long,
): ResponseEntity<List<String>> {
    println("호출전: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")

    val resultList = Flux.range(1,10)
            .map {
                feignService.testCall(sleep, it.toLong())
            }
            .collectList().block()

    println("호출후: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")
    return ResponseEntity.ok(resultList)
}

Flux를 통해 1부터 10까지 데이터를 흘려보내면서 feign client 를 사용하여 mock api server 를 호출하고 있다.

해당 코드를 실행해 보면 아래와 같은 결과를 얻을 수 있다.

sleep 을 1000 으로 설정해서 호출
약 10초가 소요되었다.

Flux 를 기반으로 api를 호출했지만 map 내부에서 사용하는 feign에서 blocking 이 발생했기 때문에 10초가 소요된 것을 확인할 수 있다.

그렇다면 feign을 사용해 api 호출하는 부분을 Mono로 감싸면 어떻게 될까?

2. feign + flatmap

@GetMapping("/test/feign/flatmap/{sleep}")
fun `feign_flatmap`(
        @PathVariable("sleep") sleep: Long,
): ResponseEntity<List<String>> {
    println("호출전: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")

    val resultList = Flux.range(1,10)
            .flatMap {
                Mono.just(feignService.testCall(sleep, it.toLong()))
            }
            .collectList().block()

    println("호출후: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")
    return ResponseEntity.ok(resultList)
}

이번에는 Mono 로 감싸고 flatmap 을 사용했다.
역시나 10초가 소요되는 것을 확인할 수 있다.

feign을 통한 동기통신 구간을 Mono로 감싸고 flatmap을 사용했지만 여전히 10초가 소요되는 것을 확인할 수 있다.

이를 통해 io-waiting 구간을 Mono로 감싸고 flatmap을 사용한다고 할지라도 reactive stack을 사용하지 않는다면 결국 thread blocking 은 발생한다는 것을 알 수 있다.

 

3. webclient + map

@GetMapping("/test/webclient/map/{sleep}")
fun `webclient_map`(
        @PathVariable("sleep") sleep: Long,
): ResponseEntity<List<String?>> {
    println("호출전: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")
    val resultList = Flux.range(1,10)
            .map {
                webClientCall(sleep, it).block()
            }
            .collectList().block()

    println("호출후: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")
    return ResponseEntity.ok(resultList)
}

이번에는 reactive stack 인 webclient 를 사용했다
여전히 10초가 소요되고 있다.

이번에는 reactive stack 인 webclient를 사용해 외부 api를 호출했다.

하지만 block 메서드를 사용했기 때문에 해당 지점에서 thread가 blocking 되어 총 10초가 소요된 것을 확인할 수 있다.

 

4. webclient + flatmap

@GetMapping("/test/webclient/flatmap/{sleep}")
fun `webclient_flatmap`(
        @PathVariable("sleep") sleep: Long,
): ResponseEntity<List<String?>> {
    println("호출전: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")

    val resultList = Flux.range(1,10)
            .flatMap {
                webClientCall(sleep, it)
            }
            .collectList().block()

    println("호출후: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")
    return ResponseEntity.ok(resultList)
}

webclient 를 사용하면서 flatmap 을 사용했다
1초만에 10개 api 를 다 처리한것을 확인할 수 있다!

이번에는 reactive stack 인 webclient를 사용하면서 동시에, webclient 가 리턴하는 Mono를 flatmap을 사용해 그대로 변환하였다.

그렇게 사용하니 드디어 10개의 api를 1초 만에 처리하는 것을 확인할 수 있었다!

 

결론

  • flatmap을 사용한다고 해서 반드시 thread 가 non-blocking 하게 동작하지는 않음
    • blocking 작업을 mono나 flux 같은 publisher로 감싼다고 non-blocking으로 동작하는 게 아님
    • 결국 기술 자체가 reactive 해야 함
  • flatmap 은 reactive stack을 사용한 데이터 변환 작업에서 사용할 수 있음
  • 실무적으로는
    • cpu bound 작업은 map으로 엮어주고
    • io bound 작업은 이 작업을 수행할 수 있는 reactive 기술을 찾고, 그 기술을 사용하면서 flatmap으로 엮어주자
    • io bound 작업이지만 이 작업을 지원하는 reactive 기술이 없다면...?? 어쩔 수 없이 특정 스레드풀을 만들고 그 스레드풀에 blocking 작업을 격리하자
  • webflux 를 사용할 때 blocking 포인트 없이 fully reactive stack 을 구성하려고 노력하자

참고자료

 

 

 

댓글