본문 바로가기
Spring-Batch

spring batch chunk 기반 처리 시 주의점: 스킵 시 중복처리 되는 문제

by 권성호 2023. 8. 8.

사실이 아니라 공부한 내용과 생각을 정리한 글입니다. 언제든 가르침을 주신다면 감사하겠습니다.

 

spring batch의 chunk 기반 처리를 사용해 배치를 개발하던 중 스킵을 사용할 시 ItemProcessor와 ItemWriter의 동작 원리에 의해 특정 item 이 중복처리 될 수 있음을 알게 되었고, 놓칠 수 있는 부분이라고 생각이 들어서 정리해 두려고 한다.

 

거두절미하고 예제 코드부터 살펴보자

예제 코드

@Configuration
@RequiredArgsConstructor
public class TestJobConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job testJob() {
        return jobBuilderFactory.get("testJob")
                .start(testStep())
                .build();
    }

    @Bean
    @JobScope
    public Step testStep() {
        return stepBuilderFactory.get("testStep")
                .<Integer, Integer>chunk(3)
                .reader(listItemReader())
                .processor(processor())
                .writer(writer())
                .faultTolerant()
                .skip(IllegalArgumentException.class)
                .skipLimit(1000)
                .build();
    }

    @StepScope
    public ItemReader<Integer> listItemReader() {
        return new ListItemReader(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
    }

    @StepScope
    public ItemProcessor<Integer, Integer> processor() {
        return item -> {
            System.out.println("[PROCESSOR] " + item);
//            if (item == 5) {
//                throw new IllegalArgumentException();
//            }
            return item;
        };
    }

    @StepScope
    public ItemWriter<Integer> writer() {
        return chunk -> {
//            if (chunk.contains(5)) {
//                throw new IllegalArgumentException();
//            }
            System.out.println("[WRITER] " + chunk);
        };
    }
}

listItemReader 에서 1부터 10까지 item을 읽어드리고 processor와 writer는 전달된 item을 출력한다.

청크 사이즈는 3으로 잡았기 때문에 실행 결과는 아래와 같다.

[PROCESSOR] 1
[PROCESSOR] 2
[PROCESSOR] 3
[WRITER] [1, 2, 3]
[PROCESSOR] 4
[PROCESSOR] 5
[PROCESSOR] 6
[WRITER] [4, 5, 6]
[PROCESSOR] 7
[PROCESSOR] 8
[PROCESSOR] 9
[WRITER] [7, 8, 9]
[PROCESSOR] 10
[WRITER] [10]

지극히 정상적이다.


스킵 시 item processor 중복처리 문제

그렇다면 item processor 를 아래와 같이 변경 후 실행해 보면 결과가 어떨까?

@StepScope
public ItemProcessor<Integer, Integer> processor() {
    return item -> {
        System.out.println("[PROCESSOR] " + item);
        if (item == 5) {
            throw new IllegalArgumentException();
        }
        return item;
    };
}

결과는 아래와 같다.

[PROCESSOR] 1
[PROCESSOR] 2
[PROCESSOR] 3
[WRITER] [1, 2, 3]
[PROCESSOR] 4
[PROCESSOR] 5	// <---- 이 부분에 주목!
[PROCESSOR] 4
[PROCESSOR] 6
[WRITER] [4, 6]
[PROCESSOR] 7
[PROCESSOR] 8
[PROCESSOR] 9
[WRITER] [7, 8, 9]
[PROCESSOR] 10
[WRITER] [10]

청크사이즈가 3이기 때문에 [1,2,3] 이 우선 처리되었다.

그 후 reader 가 4를 읽어서 프로세서로 보내고 프로세세는 처리에 성공한다.

그리고 reader 가 5을 읽어서 프로세서로 보냈는데, 5를 출력한 후 if 블락에 걸려 예외가 발생한다.

해당 예외는 스킵 정책에 등록되어 있기 때문에 job 이 실패하지 않고 스킵 관련 메커니즘으로 동작하기 시작한다.

즉, 지금 처리되고 있는 청크 대상인 [4,5,6] 중 5번에서 예외가 발생했기 때문에 한 트랜잭션으로 묶여있는 [4,5,6] 은 한 번에 롤백되고 5를 제외한 나머지 item을 대상으로 processor부터 다시 수행된다.

이러한 동작은 청크단위로 트랜잭션이 묶인다는 점에서 자연스러워 보인다. 즉, 5번에서 발생한 예외로 인해 4번에서 처리된 것 까지 롤백이 되었기 때문에, 4번은 다시 처리해야 한다는 논리로 보인다.

여기에는 한가지 주의할 만한 점이 있다.

청크기반 스텝을 생성할 때 지정한 트랜잭션 매니저에 의해 관리되는 디비 오퍼레이션만 포함한다면, 스킵 예외 발생 시 롤백되기 때문에 아무런 문제가 없다.

하지만, 디비 오퍼레이션이 아니라 외부 api 호출이나 이벤트 발행 등 트랜잭션 매니저에 의해 롤백되는 대상이 아닌 CUD 성 작업을 한다면 주의해야만 한다.


processorNonTransactional

프로세서에서 처리하는 작업이 트랜잭션 관리대상이 아니라 단순 외부 API 호출 정도라면 processorNonTransactional()을 사용할 수도 있다.

청크 기반 스텝을 생성하는 부분을 아래와 같이 변경 후 다시 실행해 보자

    @Bean
    @JobScope
    public Step testStep() {
        return stepBuilderFactory.get("testStep")
                .<Integer, Integer>chunk(3)
                .reader(listItemReader())
                .processor(processor())
                .writer(writer())
                .faultTolerant()
                .skip(IllegalArgumentException.class)
                .skipLimit(1000)
                .processorNonTransactional()	// <-- 추가된 부분
                .build();
    }

결과는 아래와 같다.

[PROCESSOR] 1
[PROCESSOR] 2
[PROCESSOR] 3
[WRITER] [1, 2, 3]
[PROCESSOR] 4
[PROCESSOR] 5	
[PROCESSOR] 6
[WRITER] [4, 6]	// <---- processor 제처리 없이 바로 writer 로 넘어왔다!
[PROCESSOR] 7
[PROCESSOR] 8
[PROCESSOR] 9
[WRITER] [7, 8, 9]
[PROCESSOR] 10
[WRITER] [10]

결과에서 확인할 수 있듯이 item 5 가 processor 로 들어간 후 스킵 예외가 발생했지만 item 4번에 대한 제처리 없이 writer 로 청크가 전달된 것을 확인할 수 있다.

processorNonTransactional()을 사용한다는 것은 스프링 배치에 해당 스텝의 프로세서는 트랜잭션 관리 대상에서 제외됨을 알려주는 의미이다.

트랜잭션 관리 대상에서 제외되기 때문에 롤백 된 item을 제처리 할 필요가 없고 그렇기 때문에 4번이 2번 수행되지 않은 것이다.

결론적으로 프로세서에 트랜잭션 관리에 포함되어야하는 작업이 없다면 processorNonTransactional()을 사용하는 것이 좋아 보인다.


writer에서 예외발생

이번에는 writer 코드를 아래와 같이 변경해 예외를 발생시켜 보자

    @StepScope
    public ItemWriter<Integer> writer() {
        return chunk -> {
            System.out.println("[WRITER] " + chunk);
            if (chunk.contains(5)) {
                throw new IllegalArgumentException();
            }
        };
    }

그리고 processorNonTransactional() 을 사용했을 때와 그렇지 않았을 때를 비교해 보자

// processorNonTransactional 사용 시 
[PROCESSOR] 1
[PROCESSOR] 2
[PROCESSOR] 3
[WRITER] [1, 2, 3]
[PROCESSOR] 4
[PROCESSOR] 5
[PROCESSOR] 6
[WRITER] [4, 5, 6]
[WRITER] [4]
[WRITER] [5]
[WRITER] [6]
[PROCESSOR] 7
[PROCESSOR] 8
[PROCESSOR] 9
[WRITER] [7, 8, 9]
[PROCESSOR] 10
[WRITER] [10]

// processorNonTransactional 사용 안할 시 
[PROCESSOR] 1
[PROCESSOR] 2
[PROCESSOR] 3
[WRITER] [1, 2, 3]
[PROCESSOR] 4
[PROCESSOR] 5
[PROCESSOR] 6
[WRITER] [4, 5, 6]
[PROCESSOR] 4
[WRITER] [4]
[PROCESSOR] 5
[WRITER] [5]
[PROCESSOR] 6
[WRITER] [6]
[PROCESSOR] 7
[PROCESSOR] 8
[PROCESSOR] 9
[WRITER] [7, 8, 9]
[PROCESSOR] 10
[WRITER] [10]

우선 writer 에서 스킵가능한 예외가 발생해도 processorNonTransactional() 은 의도에 맞게 잘 동작하는 것을 확인할 수 있다.

그리고 processorNonTransactional() 이 있더라도 writer 에는 item 4가 중복 처리 된 것을 확인할 수 있다.

아직까지 writer 에서 중복처리를 우회할 수 있는 방법은 잘 모르겠다. 

실제 프로젝트에는 writer 에 중복처리 되면 안되는 외부 api 나 이벤트 발생 작업을 최대한 프로세서 쪽으로 몰아서 사용하고 있다....

댓글