CompletableFuture란?
java5부터 비동기 연산을 위해 추가된 Future 인터페이스의 문제를 보완해서 java8에 나온 구현한 클래스
CompletableFuture를 사용하기 전에 Future의 어떤 문제점을 보완했는지 살펴보자
Future의 문제점
- 여러 연산을 결합하기 어려운 문제
- 비동기 처리 중에 발생하는 예외를 처리하기 어려운 문제
CompletableFuture 개선 사항
Future 인터페이스와 *CompleteStage 인터페이스를 구현하여 하고 있다.
*CompleteStage: 여러 연산을 결합할 수 있도록 연산이 완료되면 다음 작업을 수행, 값을 연산하는 비동기식 연산 단계를 제공하는 인터페이스
따라서 CompletableFuture는 여러 연산을 결합한 비동기 연산 처리, 예외 처리 등을 위한 50여가지 다양한 메서드 제공한다.
Future | CompletableFuture |
블로킹 | 논블로킹 |
여러 연산을 함께 연결하기 어려움 | 여러 연산을 함께 연결 |
여러 연산 결과를 결합하기 어려움 | 여러 연산 결과를 결합 |
연산 성공 여부만 확인할 수 있고 예외 처리의 어려움 | exceptionally(), handle()을 통한 예외 처리 |
CompletableFuture 는 비동기 작업 처리를 위한 Java에서 제공하는 클래스이다.
일반적으로 비동기 작업을 처리하는 환경은 멀티 스레딩 환경이므로, 멀티 스레딩의 트레이드 오프를 살펴보자.
💡 멀티 스레딩의 장점
1. 스레드간 공유 영역: 프로세스를 사용하면 동시에 처리하던 일을 스레드로 구현할 경우 메모리 공간과 시스템 자원 소모가 줄어들게 된다.
2. 통신 비용: 프로세스간 통신 방법에 비해 스레드 간 통신 방법이 훨씬 간단하다.
3. 컨텍스트 스위칭 비용 저렴: 스레드의 컨텍스트 스위칭은 프로세스의 컨텍스트 스위칭과는 달리 캐시 메모리를 비울 필요가 없다. 프로세스가 달라지면 메모리 위치가 달라지므로 CPU에 저장된 캐시 데이터를 무효화하고 새로운 프로세스의 메모리 위치를 캐싱해야하므로 프로세스 간 컨텍스트 스위치이은 캐시 메모리도 비워야한다). 이에 Throughput이 향상되고 자원 소모가 줄어들며 자연스럽게 프로그램의 응답 시간이 단축된다.
💡 멀티 스레딩의 단점
1. 공유자원 문제: 스레드간 공유하는 영역이 생기므로 공유 자원에 대해 신경 써줘야한다.(race condition 해결을 위한 뮤텍스, 세마포어 등 매커니즘으로 해결해야함)
-> 공유 자원에 대한 동기화 작업 필요하다. 동기화를 통해 작업 처리 순서를 컨트롤러하고 공유 자원에 대한 접근을 컨트롤하는 것이다. 하지만 이로 인해 병목 현상이 발생하여 성능이 저하될 가능성이 높다. 그러므로 과도한 락으로 인해 병목 현상을 줄여야한다.
멀티 스레딩에서 공유 자원에 대한 접근 문제를 고려해야하지만, 지금 비동기/논블로킹 로직에서는 자원을 조회하고 생성하는 작업밖에 없다. 따라서 공유 자원에 대한 문제는 딱히 없을 것으로 생각이 들었다.
CompletableFuture 사용법
CompletableFuture라는 객체를 스레드에 할당해줄 작업을 감싸는 객체라고 생각하면 된다.
비동기로 처리할 작업 등록하기 (supplyAsync)
CompletableFuture에서 스레드에 할당할 작업을 만드들고 큐에 등록하는 메서드는 다음과 같다.
`supplyAsync(Supplier<U> supplier)`: Supplier를 인수로 supplyAsync()를 호출하면 ForkJoinPool.commonPool()에서 전달된 Supplier를 비동기적으로 호출한 뒤 CompleteableFuture 인스턴스를 반환하게 됩니다. -> 스레드에 할당한 작업 = 인자로 전달되는 supplier
여기서 참고로 get() 메서드를 호출하면 Supplier의 비동기 작업을 기다리다가 작업이 완료되면 결과를 반환하게 됩니다.
ExecutorService executor;
executor = Executors.newFixedThreadPool(2);
Supplier<String> slowTask = () -> {
String threadName = Thread.currentThread().getName();
System.out.println("[" + LocalTime.now() + "] " + threadName + " - 작업 시작");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("[" + LocalTime.now() + "] " + threadName + " - 작업 완료");
return "완료";
};
// 비동기 작업 등록
CompletableFuture.supplyAsync(slowTask, executor);
CompletableFuture.supplyAsync(slowTask, executor);
CompletableFuture.supplyAsync(slowTask, executor);
// 비동기 로그가 찍히도록 충분히 대기
Thread.sleep(7000);
System.out.println("[" + LocalTime.now() + "] 메인 종료");
executor.shutdown();
}
supplyAsync로 supplier(작업)을 큐에 등록하고, executor(스레드풀)의 스레드를 사용하여 비동기로 처리하는 코드이다.
순차적으로 연산 처리하기
비동기인데 순차적으로 처리한다는 게 모순인거 같은데, 그냥 비동기로 처리하는 작업에 대한 후처리를 해주는 메서드가 있다고 이해하면 된다.
위에서 비동기로 처리할 작업을 큐에 등록했다. 이 큐에 등록된 작업이 스레드를 할당 받아서 작업을 수행한 후, 이 작업에 대한 후처리가 순차적으로 일어나야한다면 아래 메서드를 사용하면된다.
/**
* 인자로 받은 Function을 사용하여 다음 연산 처리
* Function의 반환 값을 가지고 있는 CompletableFuture<U> 반환
*/
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
/**
* Consumer를 인자로 받고, 결과를 CompletableFuture<Void> 로 반환
* get() 호출 시 연산을 처리하고 Void 유형의 인스턴스를 반환
*/
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
/**
* Runnable을 인자로 받고, 결과를 CompletableFuture<Void> 로 반환
* get() 호출 없이 연산을 처리
*/
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
위 세개의 메서드에서 thenXxxAsync라는 메서드를 제공하는데, 이건 후처리도 비동기로 실행되는 것이다.
thenApply()
thenApply() 메서드는 이전 단계의 결괏값을 인수로 사용하고 전달한 Function을 다음 연산으로 사용한다. 실행결과로 Function의 반환 값을 가지고 있는 CompletableFuture<U>를 반환한다. -> 후처리된 결과가 필요한 경우 사용할 수 있다.
@Test
@DisplayName("주문 정보 조회가 완료되면 결제 정보를 조회하는 예시입니다.")
void thenApply() throws Exception {
String orderNo = "1234567890";
CompletableFuture<String> orderInfoFuture = CompletableFuture.supplyAsync(() -> getOrderInfo(orderNo));
CompletableFuture<String> paymentInfoFuture = orderInfoFuture.thenApply(s -> getPaymentInfo(s)); // 이전 연산 완료 후(2초 후) 다음 연산 처리
assertEquals("iPhone 15 / 결제 정보: 신용카드 1,200,000원", paymentInfoFuture.get());
}
private String getPaymentInfo(String s) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// ..
}
return s + " / 결제 정보: 신용카드 1,200,000원";
}
thenAccept()
thenAccept() 메서드는 Consumer를 인자로 받고, 결과를 CompletableFuture<Void> 를 반환한다. 리턴 타입이 없는 로직을 호출할 때 사용할 수 있다. -> 후처리된 작업의 결과가 필요없는 경우 사용
@Test
@DisplayName("주문 정보 조회가 완료되면 주문 정보를 업데이트하는 예시입니다.")
void thenAccept() throws Exception {
String orderNo = "1234567890";
CompletableFuture<String> orderInfoFuture = CompletableFuture.supplyAsync(() -> getOrderInfo(orderNo));
CompletableFuture<Void> thenAccept = orderInfoFuture.thenAccept(s -> updateOrderInfo(s));
thenAccept.get(); // Completed update: iPhone 15 출력
}
private void updateOrderInfo(String s) {
System.out.println("Completed update: " + s);
}
thenRun()
thenRun()은 Runnable 객체를 인자로 받고, thenAccept() 와 동일하게 CompletableFuture<Void> 를 반환한다.
@Test
@DisplayName("주문 정보 조회가 완료되면 로그를 남기는 예시입니다.")
void thenRun() throws Exception {
String orderNo = "1234567890";
CompletableFuture<String> orderInfoFuture = CompletableFuture.supplyAsync(() -> getOrderInfo(orderNo));
orderInfoFuture.thenRun(() -> writeLog(orderNo));
}
private void writeLog(String orderNo) {
System.out.println("Completed query: " + orderNo);
}
💡 함수형 인터페이스 Supplier, Consumer, Runnable 사용 의도
비동기 작업에 대한 후처리를 하기 위해 순서를 보장하기 위해 위에서 설명한 메서드를 사용할 수 있다. 각 메서드의 인자값이 다른데, 이를 살펴보자.
Supplier: 입력 값이 있고, 반환 값도 있다. -> 이전 결과 값을 조작하고, 조작한 후의 결과값을 가지고 작업을 해야할 때 사용
Consumer: 입력 값이 있고, 반환 값이 없다. -> 이전 결과 값을 조작하지만, 조작한 후의 결과값이 필요 없을 때 사용
Runnalble: 입력 값도 없고, 반환 값도 없다. -> 이전 결과 값에 의존적이지 않은 작업을 실행할 때 사용
지금까지 비동기로 작업을 등록(supplyAsync()) 하고, 이 작업에 순차적인 연산을 처리하는 방법을 알아보았다. 내가 CompletableFuture 를 사용한 로직을 간단하게 설명하면,
- I/O Bound 작업(`rowDataScan`)들은 supplyAsync()로 비동기 작업으로 큐에 등록한다.
- CompletableFuture의 디폴트 스레드 풀인 ForkjoinPool 에 할당해줄 스레드가 있다면, 스레드를 할당받아서 작업을 수행한다.
- 작업이 완료된 I/O Bound 작업은 후처리(`fetchDataScan`)가 필요하므로 thenApplyAsync() 로 작업이 완료된 후 후처리를 진행하도록 진행. 후처리에 대한 작업도 비동기로 처리되도록 thenApply() 대신 thenApplyAsync()를 사용하였다.
- 하나의 작업에 대해 후처리가 모두 끝나면 .get()을 호출한다 -> 여기서 블로킹당한다.
I/O Bound 작업이 운영 서버에 AWS CLI를 실행하는 작업인데, 여기에 옵션이 추가된다면 CLI + 옵션 을 적용한 명령어를 서버 내부적으로 실행하게 된다. 이때 나오는 결과를 기준으로 다시 CLI 를 구성하고 이에 대해 다시 서버에서 CLI를 실행하는 구조이다.
예를 들어서 IAM 자원을 조회하는 AWS CLI를 입력한다고 해보자
aws iam list-access-keys
사용자가 위 명령어를 입력한 후, 명령어의 결과를 아래와 같이 보여준다.
명령어에 대한 결과는 아래와 같다.
{
"AccessKeyMetadata": [
{
"UserName": "Bob",
"Status": "Active",
"CreateDate": "2013-06-04T18:17:34Z",
"AccessKeyId": "AKIAIOSFODNN7EXAMPLE"
},
{
"UserName": "Bob",
"Status": "Inactive",
"CreateDate": "2013-06-06T20:42:26Z",
"AccessKeyId": "AKIAI44QH8DHBEXAMPLE"
}
]
}
여기서 사용자가 여러 속성 중에서 UserName이 포함된 결과만 보고 싶다면, commad2(옵션) 입력칸에 `userName` 의 값을 입력한다.
그러면 서버 내부적으로 `aws iam list-access-keys` 에 대한 결과에서 나온 UserName을 모두 추출한 다음에 UserName의 개수 만큼 CLI를 만들어서 서버에 실행하는 구조이다. (aws iam list-access-keys --user-name Bob, aws iam list-access-keys --user-name anotherName, ...)
따라서 rowDataScan은 command2(옵션)에 값이 있다면 command2(옵션)의 입력값과 일치한 값을 추출해서 그 수 만큼의 CLI를 만들고, 실행하는 것이다. 그래서 아래 그림을 보면 rawDataScan에서 사용자가 한개의 명령어만 입력했는데도 한개의 명령어에 대해서 여러번의 rawDataScan 로직이 돌아가고, 이에 대한 후처리 작업이 비동기/논블로킹으로 실행되는 것이다.
// 각각의 예외 처리 및 에러 로그 남기기 위해서 allOf 대신, get 사용
for (DescribeTaskDto describeTask : futureTaskList) { //하나의 작업에 여러개의 cli를 실행할 수 있음 왜? command2에 의해 cli가 여러개 실행될 수 있다.
CompletableFuture<List<DescribeEntity>> describeOfConfig = describeTask.getDescribeOfConfig();
DescribeTypeDto describeType = describeTask.getDescribeType();
List<DescribeEntity> taskResultData = new ArrayList<>();
try {
taskResultData = describeOfConfig.get();//여기서 get을 하면 각 비동기/논블로킹에 대한 작업이 블로킹/동기로 처리된다.(각작업을 리스트로 만들어서 비동기 논블로킹으로 처리되도록 해야함)
} catch (InterruptedException e) {
isSuccess = false;
e.fillInStackTrace();
} catch (ExecutionException e) {
awsCliService.saveErrorLog(account, describeType, e);
isSuccess = false;
e.fillInStackTrace();
}
resultData.addAll(taskResultData);
}
stopwatch.stop();
log.info(stopwatch.prettyPrint());
이렇게 되면 결국 사용자가 등록한 하나의 CLI 설정마다 블로킹(.get())을 하고 있다고 보여진다. 잘 생각해보면 각 CLI에 대해 실행하고, 파싱하는 작업이 블로킹으로 실행될 필요는 없어보인다. 그래서 이를 논블로킹으로 바꾸기 위해 아래와 같이 리팩토링하였다.
CompletableFuture는 위에서 언급했다시피 예외 처리 메서드를 제공한다. .exceptionally() 를 사용해서 예외처리 로직을 포함한 작업을 CompletableFuture로 감싸고주 모두 리스트에 담는다.
그 후에 리스트에 담긴 모든 작업을 .allOf()로 비동기/논블로킹으로 실행한다.
List<CompletableFuture<List<DescribeEntity>>> futures = futureTaskList.stream()
.map(task -> task.getDescribeOfConfig()
.exceptionally(ex -> {
awsCliService.saveErrorLog(account, task.getDescribeType(), ex);
return Collections.emptyList(); // 실패 시 빈 결과 반환
}))
.collect(Collectors.toList());
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
List<DescribeEntity> resultData = new ArrayList<>();
boolean isSuccess = true;
try {
allFutures.join(); // 모든 작업 완료까지 대기 (스레드 낭비 거의 없음)
for (int i = 0; i < futures.size(); i++) {
List<DescribeEntity> data = futures.get(i).join();
resultData.addAll(data);
}
} catch (CompletionException ex) {
isSuccess = false;
// 필요시 추가 예외 처리 로직
}
return Result.of(isSuccess, resultData);
여기서 allFutures.join()이 블로킹을 하게 되는데, 리스트로 담은 모든 작업을 완료할 때까지 블로킹하고, 작업이 완료되면 해당 데이터를 저장해야하므로 블로킹 작업이 들어가야한다.
그림으로 보면 아래와 같다.
최대한 쉽게 설명하려고 노력했는데, 특정 코드만 떼어서 설명한 것이라서 해당 로직이 어떻게 돌아가는지, 어떤 서비스를 수행하는 코드인지 이해하기 어려울 거라고 생각한다. 나중에 더 쉬운 예시로 내용을 보완하고자 한다.
스레드 풀에 대해서도 할 말이 많은데 그건 다음 포스팅에 올려보겠다. (ForkJoinPool과 ExecutorService 풀이 동작하는 방식이 다르고, 본인 서비스에 맞는 스레드풀을 적용해야한다.)
고찰
이 프로젝트에서 처음으로 자바에서 비동기/논블로킹을 구현해보았고, 멀티스레드 환경을 구성하는 방법도 공부할 수 있었다. 아쉬운 점은 실제 운영 환경을 분석해서 cpu 수를 확인하고 이에 적합한 스레드 수를 산출해보지 못했다는게 아쉬웠다. 그 당시 OS 지식 부족 + 스레드 적정 개수 공식의 존재르 알지 못함 으로 인해 그냥 스레드 수 때려맞추기로 성능을 개선했다. 다음에도 비동기 실행으로 성능을 개선할 여지가 보이는 프로젝트를 맡게 된다면 운영 환경 분석 + 시스템 지표 확인 + 공식 적용 을 하면서 스레드 개수를 조정하고 싶다.
또한 I/O Bound 와 CPU Bound 성격 측면에서 스레드 풀을 나눠서 실행했다면 더 좋지 않았을까? 라는 아쉬움도 있다.
하지만 실험적인 방식으로 스레드 개수를 산출하고 실제로 50% 이상의 성능개선을 이뤄내서, 공식에만 의존하는 것이 아닌 실험적으로 성능 측정하는 것의 중요성도 알게 되었다.
'언어 > JAVA' 카테고리의 다른 글
[자바 최적화] 2장 JVM 이야기 (3) | 2025.08.02 |
---|---|
[자바 최적화] 1장 성능과 최적화 (4) | 2025.08.01 |
[Java] Java Collection Framework (JCF) (1) | 2025.03.23 |
[JAVA] EOF 사용법 (0) | 2024.04.10 |
Optional 올바르게 사용하자 (0) | 2024.03.30 |