Spring Batch

Spring Batch 실패를 다루는 기술 - ItemStream

제리 . 2023. 1. 7. 22:18

들어가며

배치 작업을 실행하다 보면 실패 상황에 빈번하게 직면하게 됩니다. 배치를 실패한 경우 단순하게 재시작을 해서 처리하는 방법도 있지만, 시간이 오래 걸리는 배치의 경우 처음부터 다시 실행하는 게 쉽지 않습니다. 다행히 Spring Batch에서는 ItemStream을 제공합니다. 이번에는 ItemStream을 활용하여 어떻게 재시작 관점에서의 문제를 해결할 수 있는지 정리해 봤습니다.

BATCH_STEP_EXECUTION_CONTEXT 테이블

ItemStream을 살펴보기 전 BATCH_STEP_EXECUTION_CONTEXT 테이블을 설명드리겠습니다.

이 테이블은 Spring Batch에서 관리하는 메타데이터 테이블 중 하나입니다.

Spring Batch 공식 문서에서는 이 테이블을 다음과 같이 소개합니다.

해석해보자면 이렇습니다.

  • 이 테이블은 Step에 대해 ExecutionContext에 대한 정보를 보유하고 있음
  • StepExecution에 대해 하나의 ExecutionContext가 존재함
  • 일반적으로 JobInstance가 "중단된 위치에서 시작"할 수 있도록 실패 후 검색해야 하는 상태를 나타냄

즉, step의 실행 상태를 저장하여 중단된 위치에서 다시 실행할 수 있도록 사용되는 데이터를 의미합니다.

ItemStream이란?

ItemStream은 하나의 인터페이스입니다. 이 인터페이스는 아래와 같은 기능을 수행합니다.

상태를 주기적으로 저장하고 오류가 발생할 경우 해당 상태에서 복원하기 위한 내용을 정의하는 마커 인터페이스.

어딘가 익숙하지 않으신가요? 앞서 소개했던 BATCH_STEP_EXECUTION_CONTEXT과 연관이 있습니다. ItemStream를 사용하는 주된 목적은 StepExecutionContext에 실행상태를 저장하기 위해서입니다. ItemStream에서 제공하는 open, update 같은 메서드의 파라미터로서 ExecutionContext가 제공되는데, 이걸 활용하면 map을 다루는 것처럼 테이블의 데이터를 읽거나 쓸 수 있습니다. 

 

대략적인 그림이 그려졌다면 ItemStream메서드에 대해서 간단히 설명드리겠습니다.

  • open : step이 실행될 때 호출되는 메서드입니다. 주로 저장된 실행상태를 읽어올 때 사용합니다.
  • update : ExecutionContext를 업데이트할 때 활용되는 메서드입니다.
  • close :  step이 종료될 때 호출되는 메서드입니다. 자원을 반환할 때 활용되곤 합니다.

이러한 ItemStream의 메서드를 어디에서 구현하고 적용해야 할까요?

ItemStreamReader, ItemStreamWriter

Spring Batch에서는 ItemStream을 결합한 ItemStreamReader, ItemStreamWriter 인터페이스가 존재합니다.

ItemStream을 사용하고 싶으시다면 ItemReader, ItemWriter대신 이 인터페이스를 구현하시면 됩니다.

둘 다 구현이 필요한가요?

상황에 따라서 다르겠지만 대부분의 경우 한 가지의 인터페이스를 구현하여 처리할 수 있습니다.

그렇다면 어떤 인터페이스를 구현해야 할까요? 둘 중 구현하기 쉬운 쪽을 선택하는 것이 좋습니다.

Reader, Writer의 로직에 따라서 상태를 저장하기 쉬운 곳에서 ItemStream을 구현하시면 됩니다.

(일반적인 상황에서는 ItemReader에서 상태를 저장하기 쉽고, Reader에서 itemStream open을 사용하여 불러온 시작 지점을 사용하기때문에 ItemSteamReader를 우선 고려하기도합니다)

 

ItemStreamReader만 사용하면 write 완료 상태를 적절하게 저장하지 못하고 read완료 상태를 저장하게 되는게 아니냐고 생각하실 수 있지만, 꼭 그렇지는 않습니다.

그 이유에 대해서 설명드리겠습니다. ItemStream에 대해서 익숙하신 분들도 간혹 잘 못 알고 계시는 경우가 있는데 바로 ItemStream의 처리 순서입니다.

예를 들어, 이러한 순서로 ItemStream이 처리된다고 생각하시곤 합니다.

실행 상태를 마킹하는 경우 writer로직 이후 호출되는 ItemStreamWriter에서 update를 하는 것이 더 논리적으로 적합해 보입니다.

하지만 실제로 ItemStream은 이 순서로 동작하지 않습니다.

ItemStream 실행 과정

Step에서 ItemStream이 처리되는 과정은 크게 3단계로 나눠져 있습니다.

(코드상으로는 AbstractStep, TaskletStep 클래스를 확인하시면 도움이 되실 겁니다.)

초기화

Step이 실행되면 등록된 ItemStream의 open을 먼저 실행합니다. 이후 ItemStream의 update가 호출됩니다.

chunk 처리 반복

chunk 처리 반복 단계에서는 read → process → write가 처리되는 tasklet 실행과 itemStream의 update가 반복됩니다.

stream update가 tasklet처리 이후에 실행되기 때문에 ItemStreamReader, ItemStreamWriter 어느 쪽에서 update를 구현하던지 순서상으로는 크게 의미가 없습니다. 이런 이유로 상태 저장을 하기 쉬운 쪽, open을 통해 불러온 상태를 사용하기 쉬운쪽에서 구현하시는 것이 좋습니다.

종료

itemStream의 close가 순차적으로 호출되면서 종료됩니다.

 

전체적인 순서는 다음과 같습니다.

참고) stream 인터페이스를 구현하는 경우 위에서 말씀드린 것처럼 chunk처리와는 별개의 context에서 이뤄지게 됩니다. 그런 이유로 chunk를 예외를 처리하는 방식 중 skip, retry는 stream로직에는 적용되지 않습니다. stream을 구현한 로직에서 예외가 발생하면 배치는 종료되므로 가능한 stream로직은 복잡하지 않게 구현하시는 걸 추천드립니다. stream 구현에 try-catch를 사용하여 내부적으로 예외처리를 하는 것도 좋은 방법입니다.

ItemStream를 적용한 Job 예제

이제 어떻게 ItemStream이 동작하는지 확인해봤으니 코드상으로 어떻게 활용하면 되는지 간단한 예제를 통해 보여드리겠습니다.

@Bean
fun itemStreamJob(): Job {
    return jobBuilderFactory["itemStreamJob"]
        .start(itemStreamStep())
        .build()
}

fun itemStreamStep(): Step {
    return stepBuilderFactory["itemStreamStep"]
        .chunk<Int, Int>(3)
        .reader(itemStreamReader())
        .writer { log.info("write items ($it)") }
        .build()
}

fun itemStreamReader(): ItemStreamReader<Int> {
    return object : ItemStreamReader<Int> {
        private var offset = 0
        private lateinit var queue: Queue<Int>

        override fun open(executionContext: ExecutionContext) {
            val initQueue = listOf(1, 2, 3, 4, 5)
            offset = executionContext.getInt("READ_OFFSET", 0)
            queue = LinkedList(initQueue.subList(offset, initQueue.size))
            log.info("itemStream open offset : $offset")
        }

        override fun update(executionContext: ExecutionContext) {
            executionContext.put("READ_OFFSET", offset)
            log.info("itemStream update offset : $offset")
        }

        override fun close() {
            log.info("itemStream close")
        }

        override fun read(): Int? {
            val result = queue.poll()
            offset++
            log.info("read : $result, offset: $offset")
            return result
        }
    }
}

실행 결과는 다음과 같습니다.

우선, 초기화 단계로서 ItemStream의 open, update가 수행됐습니다.

그 이후로는 chunk 처리 반복 단계인 tasklet 실행 + stream update가 수행됐습니다. update는 tasklet실행(chunk처리)이 종료된 후 발생하는 것을 확인할 수 있습니다.

마지막으로 종료단계인 close를 호출하고 종료됐습니다.

 

BATCH_STEP_EXECUTION_CONTEXT 테이블을 확인해 보겠습니다.

READ_OFFSET을 key로 데이터가 잘 들어가 있는 것을 확인할 수 있습니다.

 

이번에는 실패 케이스를 만들어 볼까요?

override fun read(): Int? {
    val result = queue.poll()
    if(result == 5) throw RuntimeException("배치 예외발생") // 임시 예외 발생
    offset++
    log.info("read : $result, offset: $offset")
    return result
}

배치가 실패됐습니다.

BATCH_STEP_EXECUTION_CONTEXT 테이블을 확인해 보겠습니다.

READ_OFFSET이 3으로 저장되어 있습니다. 다시 실행할 때는 예외 코드를 제거하고 재시작해보겠습니다.

open에 offset을 3으로 읽어오고 있습니다. open에서 배치 테이블에 저장된 offset을 읽어와 시작지점을 설정한 것을 확인할 수 있습니다.

Tasklet Step에서도 이런 기술을 적용할 수 있나요?

ItemStream은 chunk oriented step에 대부분 사용되지만 Tasklet Step에서도 사용할 수는 있습니다.

예를 들어 아래와 같이 코드를 작성하면 stream을 등록할 수 있습니다.

fun step(): Step {
        return stepBuilderFactory["step"]
            .tasklet { _, _ ->
                ...
            }
            .stream(object : ItemStream {
                override fun open(executionContext: ExecutionContext) {
                    ...
                }

                override fun update(executionContext: ExecutionContext) {
                    ...
                }

                override fun close() {
                    ...
                }
            })
            .build()
    }

그런데 이 방법은 추천드리지 않습니다. stream과 배치 로직이 분리되어있으면 실패지점에 대한 상태를 저장하기가 까다로워집니다.

ItemStream을 사용하는 목적이 ExecutionContext에 상태값을 저장하는 것이라면 여기서는 다른 방법을 적용해 볼 수 있습니다.

fun step(): Step {
        return stepBuilderFactory["step"]
            .tasklet { stepContribution, chunkContext ->
                stepContribution.stepExecution.executionContext.put(key, value)
								...
            }
            .build()
    }

tasklet의 stepContribution 또는 chunkContext를 활용하면 stepExecutionContext에 접근할 수 있습니다.

이 방식은 stream이 아니므로 tasklet 실행 맥락에서 executionContext가 update 된다는 차이가 있습니다.

다양한 구현체에서 사용되는 ItemStream

이번에는 여러분들에게 익숙한 ItemReader 구현체들도 ItemStream을 구현하고 있다는 점을 말씀드리려고 합니다.

예시로 JpaPagingItemReader, JdbcPagingItemReader 두 구현체는 공통적으로 AbstractPagingItemReader를 상속받고 있습니다.

AbstractPagingItemReader는 ItemStream인터페이스를 구현하고있습니다. 각 구현체에서 어떻게 재시작 지점을 지정하고 있는지 확인해보셔도 좋을 것 같습니다.

ItemStream만 있으면 재실행은 걱정 없나요?

제가 지금까지 설명드린 ItemStream을 보면 재실행에 대해 만능처럼 보일 수 있지만 사실은 그렇지는 않습니다. 몇 가지 상황에 대해서 한계점을 설명드리겠습니다.

Job 비정상 종료 처리 시 재시작 불가능 문제

Spring Batch의 기본 개념은 실패한 Job을 재시작한다는 점입니다. 이것이 이뤄지기 위해서는 배치 메타테이블에서 Job의 상태가 실패로 마킹되어있어야 합니다. 하지만 때로는 Job이 비정상적으로 종료되어 Job이 실패 마킹이 되지 않는 경우가 종종 발생하게 됩니다. 이런 상황에서는 재시작이 불가능하므로 ItemStream도 의미가 없어지게 됩니다. 해결 방법으로는 메타테이블의 데이터를 수정하고 재시작할 수 도 있지만 꽤나 번거롭습니다. 이런 경우에는 배치 파라미터를 통해 값을 전달받아 ItemStream로직처럼 활용하게 되면 유연한 대처가 가능합니다. 

재처리 발생 가능성

위에서 보여드린 예시에서 조금 이상한 점을 발견하셨을지 모르겠습니다. log를 자세히 보시면 배치를 재시작했을 경우 데이터가 재처리되는 부분이 존재합니다. 이는 chunk size만큼 모두 데이터가 처리된 후 stream을 update 하기 때문에 chunk를 일부 처리하고 배치 실패 시 일부 처리된 상태가 update 되지 않습니다. 사실 chunk oriented batch를 작성하다 보면 중복 데이터 재처리는 꽤나 자주 만나는 상황입니다. 가능한 배치 로직 자체를 중복 실행해도 문제가 없도록 작성해주시면 좋습니다. 만약 이게 불가능하다면 redis에 마킹을 하는 방식으로 해결하거나 chunk size를 조절하는 등 고민이 필요해 보입니다.

chunk 병렬 처리 시 사용하기 힘듭니다.

stream을 업데이트하는 단위는 chunk를 처리하는 단위와 같습니다. chunk를 병렬처리하는 기술 중 muti thread step방식이나 step-partitioning을 사용하면 chunk가 순서 보장 없이 병렬로 실행되기 때문에 ItemStream update의 순서를 보장할 수 없게 됩니다. 이렇게 되면 배치 재시작 시 누락되거나 재처리되는 데이터가 발생하기 쉽습니다.

 

마치며

배치 메타테이블을 활용해서 배치 실패에 대한 재시작에 대응할 수 있는 ItemStream에 대해서 알아봤습니다. 개인적으로는 배치 실행 로그 확인할 필요 없이 재시작점을 설정해주기 때문에 편리하게 사용하는 방법입니다. 말씀드린 것처럼 itemStream의 한계점도 있지만 적절하게 활용하시면 대부분의 상황에서는 작성하시는 배치의 퀄리티를 더 높여줄 수 있는 좋은 방법이라고 생각됩니다. 감사합니다.