사실이 아니라 공부한 내용과 생각을 정리한 글입니다. 언제든 가르침을 주신다면 감사하겠습니다
서론
흔히들 webflux는 thread pool 기반의 모델과 달리 event loop 기반 모델을 사용하여 높은 처리량을 얻을 수 있다고 말한다.
위 말은 event loop 가 thread pool 기반의 모델보다 스레드를 더 효율적으로 사용한다는 것을 내포하고 있다.
즉 thread pool 기반의 모델은 io waiting 이 발생하면 thread 가 block 되는 반면, event loop 기반의 모델은 io waiting 이 발생하면 thread 가 block 되지 않고 다시 스레드풀에 반환되어 다른 처리를 할 수 있는 상태가 된다.
하지만 단순히 webflux 를 사용한다고 해서 무조건 thread를 효율적으로 사용할 수 있는 것은 아니다.
webflux를 통해 작성된 코드가 전부 blocking 지점 없이 reactive stack으로 구성되어야 비로소 높은 처리량을 확보할 수 있다.
이번 글을 통해 webflux 에서 제공하는 map과 flatmap을 비교해 보면서 기본동작을 이해하고, 나아가 webflux를 사용할 때 fully reactive stack으로 구성해야 하는 이유에 대해서도 공감할 수 있으면 좋겠다.
(예시코드는 https://github.com/rnjstjdgh/api-call-test 에 있습니다.)
테스트를 위한 Mock api server
@RestController
class TestController {
@GetMapping("/test/{sleep}/{idx}")
fun test(
@PathVariable("sleep") sleep: Long,
@PathVariable("idx") idx: Long,
): ResponseEntity<String> {
Thread.sleep(sleep)
return ResponseEntity.ok("result${idx}")
}
}
sleep millisecond 가 걸리는 외부 api를 제공하기 위한 mock api server이다.
api 요청 path에 sleep과 idx를 path variable로 넘겨주면 sleep millisecond 만큼 대기하다가 "result${idx}"를 응답하는 api를 제공한다.
해당 api endpoint를 제공하는 서버를 localhost:8081로 기동 한다.
FeignService
@Service
@FeignClient(
contextId = "testFeignService",
name = "test",
url = "\${test.api.url}", // localhost:8081 로 설정
configuration = [FeignConfig::class]
)
interface FeignService {
@GetMapping("/test/{sleep}/{idx}")
fun testCall(
@PathVariable("sleep") sleep: Long,
@PathVariable("idx") idx: Long,
): String
}
mock api server를 호출하기 위한 feign client를 생성한다.
feign는 동기식으로 api를 호출한다.
WebClientConfig
@Configuration
class WebClientConfig {
companion object {
val THREADS = 1
val THREADFACTORY: BasicThreadFactory = BasicThreadFactory.Builder()
.namingPattern("HttpThread-%d")
.daemon(true)
.priority(Thread.MAX_PRIORITY)
.build()
val EXECUTOR = ThreadPoolExecutor(
THREADS,
THREADS,
0L,
TimeUnit.MILLISECONDS,
LinkedBlockingQueue(),
THREADFACTORY,
ThreadPoolExecutor.AbortPolicy()
)
val RESOURCE = NioEventLoopGroup(THREADS, EXECUTOR)
}
@Bean(name = ["webClient"])
fun buildWebClient(): WebClient {
val provider = ConnectionProvider.builder("custom-provider")
.maxConnections(100)
.maxIdleTime(Duration.ofSeconds(58))
.maxLifeTime(Duration.ofSeconds(58))
.pendingAcquireTimeout(Duration.ofMillis(5000))
.pendingAcquireMaxCount(-1)
.evictInBackground(Duration.ofSeconds(30))
.lifo()
.metrics(false)
.build()
val reactorResourceFactory = ReactorResourceFactory().apply {
connectionProvider = provider
loopResources = LoopResources { RESOURCE }
isUseGlobalResources = false
}
return WebClient.builder()
.baseUrl("http://localhost:8081")
.clientConnector(ReactorClientHttpConnector(reactorResourceFactory) { httpClient ->
httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.responseTimeout(Duration.ofMillis(5000))
.doOnConnected { conn ->
conn.addHandlerLast(ReadTimeoutHandler(5000, TimeUnit.MILLISECONDS))
.addHandlerLast(WriteTimeoutHandler(5000, TimeUnit.MILLISECONDS))
}
})
.build()
}
}
WebClient 빈을 만드는 코드다.
코드가 다소 복잡할 수 있지만 핵심적인 부분만 차분히 살펴보자.
val THREADS = 1
val THREADFACTORY: BasicThreadFactory = BasicThreadFactory.Builder()
.namingPattern("HttpThread-%d")
.daemon(true)
.priority(Thread.MAX_PRIORITY)
.build()
val EXECUTOR = ThreadPoolExecutor(
THREADS,
THREADS,
0L,
TimeUnit.MILLISECONDS,
LinkedBlockingQueue(),
THREADFACTORY,
ThreadPoolExecutor.AbortPolicy()
)
val RESOURCE = NioEventLoopGroup(THREADS, EXECUTOR)
webClient를 만들 때 사용할 이벤트 그룹을 생성하는 코드인데, Executor를 넣어주고 있다.
Executor는 스레드풀을 지정할 수 있고 해당 스레드풀의 스레드 개수는 1개로 세팅되어 있다.
즉, webclient를 통한 api 호출을 위해 사용되는 스레드는 1개로 지정되어 있다.
MapFlatmapTestController
@RestController
class MapFlatMapTestController(
private val feignService: FeignService,
private val webClient: WebClient
) {
@GetMapping("/test/feign/map/{sleep}")
fun `feign_map`(
@PathVariable("sleep") sleep: Long,
): ResponseEntity<List<String>> {
println("호출전: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")
val resultList = Flux.range(1,10)
.map {
feignService.testCall(sleep, it.toLong())
}
.collectList().block()
println("호출후: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")
return ResponseEntity.ok(resultList)
}
@GetMapping("/test/feign/flatmap/{sleep}")
fun `feign_flatmap`(
@PathVariable("sleep") sleep: Long,
): ResponseEntity<List<String>> {
println("호출전: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")
val resultList = Flux.range(1,10)
.flatMap {
Mono.just(feignService.testCall(sleep, it.toLong()))
}
.collectList().block()
println("호출후: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")
return ResponseEntity.ok(resultList)
}
@GetMapping("/test/webclient/map/{sleep}")
fun `webclient_map`(
@PathVariable("sleep") sleep: Long,
): ResponseEntity<List<String?>> {
println("호출전: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")
val resultList = Flux.range(1,10)
.map {
webClientCall(sleep, it).block()
}
.collectList().block()
println("호출후: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")
return ResponseEntity.ok(resultList)
}
@GetMapping("/test/webclient/flatmap/{sleep}")
fun `webclient_flatmap`(
@PathVariable("sleep") sleep: Long,
): ResponseEntity<List<String?>> {
println("호출전: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")
val resultList = Flux.range(1,10)
.flatMap {
webClientCall(sleep, it)
}
.collectList().block()
println("호출후: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")
return ResponseEntity.ok(resultList)
}
private fun webClientCall(
sleep: Long,
idx: Int,
): Mono<String?> {
return webClient.get()
.uri("/test/$sleep/$idx")
.retrieve()
.bodyToMono(String::class.java)
}
}
위에서 만든 FeignService, WebClient를 가지고 mock api server를 다양한 방법으로 호출하는 컨트롤러이다.
이제 이 컨트롤러의 각각의 endpoint를 하나하나 살펴보면서 map과 flatmap의 동작을 비교해 보고 실제로 스레드가 어떻게 사용되는지 살펴보자
1. feign + map
@GetMapping("/test/feign/map/{sleep}")
fun `feign_map`(
@PathVariable("sleep") sleep: Long,
): ResponseEntity<List<String>> {
println("호출전: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")
val resultList = Flux.range(1,10)
.map {
feignService.testCall(sleep, it.toLong())
}
.collectList().block()
println("호출후: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")
return ResponseEntity.ok(resultList)
}
Flux를 통해 1부터 10까지 데이터를 흘려보내면서 feign client 를 사용하여 mock api server 를 호출하고 있다.
해당 코드를 실행해 보면 아래와 같은 결과를 얻을 수 있다.
Flux 를 기반으로 api를 호출했지만 map 내부에서 사용하는 feign에서 blocking 이 발생했기 때문에 10초가 소요된 것을 확인할 수 있다.
그렇다면 feign을 사용해 api 호출하는 부분을 Mono로 감싸면 어떻게 될까?
2. feign + flatmap
@GetMapping("/test/feign/flatmap/{sleep}")
fun `feign_flatmap`(
@PathVariable("sleep") sleep: Long,
): ResponseEntity<List<String>> {
println("호출전: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")
val resultList = Flux.range(1,10)
.flatMap {
Mono.just(feignService.testCall(sleep, it.toLong()))
}
.collectList().block()
println("호출후: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")
return ResponseEntity.ok(resultList)
}
feign을 통한 동기통신 구간을 Mono로 감싸고 flatmap을 사용했지만 여전히 10초가 소요되는 것을 확인할 수 있다.
이를 통해 io-waiting 구간을 Mono로 감싸고 flatmap을 사용한다고 할지라도 reactive stack을 사용하지 않는다면 결국 thread blocking 은 발생한다는 것을 알 수 있다.
3. webclient + map
@GetMapping("/test/webclient/map/{sleep}")
fun `webclient_map`(
@PathVariable("sleep") sleep: Long,
): ResponseEntity<List<String?>> {
println("호출전: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")
val resultList = Flux.range(1,10)
.map {
webClientCall(sleep, it).block()
}
.collectList().block()
println("호출후: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")
return ResponseEntity.ok(resultList)
}
이번에는 reactive stack 인 webclient를 사용해 외부 api를 호출했다.
하지만 block 메서드를 사용했기 때문에 해당 지점에서 thread가 blocking 되어 총 10초가 소요된 것을 확인할 수 있다.
4. webclient + flatmap
@GetMapping("/test/webclient/flatmap/{sleep}")
fun `webclient_flatmap`(
@PathVariable("sleep") sleep: Long,
): ResponseEntity<List<String?>> {
println("호출전: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")
val resultList = Flux.range(1,10)
.flatMap {
webClientCall(sleep, it)
}
.collectList().block()
println("호출후: ${LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))}")
return ResponseEntity.ok(resultList)
}
이번에는 reactive stack 인 webclient를 사용하면서 동시에, webclient 가 리턴하는 Mono를 flatmap을 사용해 그대로 변환하였다.
그렇게 사용하니 드디어 10개의 api를 1초 만에 처리하는 것을 확인할 수 있었다!
결론
- flatmap을 사용한다고 해서 반드시 thread 가 non-blocking 하게 동작하지는 않음
- blocking 작업을 mono나 flux 같은 publisher로 감싼다고 non-blocking으로 동작하는 게 아님
- 결국 기술 자체가 reactive 해야 함
- flatmap 은 reactive stack을 사용한 데이터 변환 작업에서 사용할 수 있음
- 실무적으로는
- cpu bound 작업은 map으로 엮어주고
- io bound 작업은 이 작업을 수행할 수 있는 reactive 기술을 찾고, 그 기술을 사용하면서 flatmap으로 엮어주자
- io bound 작업이지만 이 작업을 지원하는 reactive 기술이 없다면...?? 어쩔 수 없이 특정 스레드풀을 만들고 그 스레드풀에 blocking 작업을 격리하자
- webflux 를 사용할 때 blocking 포인트 없이 fully reactive stack 을 구성하려고 노력하자
참고자료
- https://www.baeldung.com/java-reactor-map-flatmap
- https://www.youtube.com/watch?v=I0zMm6wIbRI
'java > thread(동시성)' 카테고리의 다른 글
API 호출 관점에서 WebClient 와 Coroutine 조합해보기 (0) | 2024.01.30 |
---|---|
API 호출 관점에서 TaskExecutor & Coroutine & Webclient 비교해보기 (0) | 2023.09.08 |
비동기 & 논블러킹 의미 분석 (0) | 2022.08.11 |
[Udemy: Java Multithreading, Concurrency & Performance Optimization] Motivation (0) | 2021.11.25 |
댓글