제리 devlog

회원가입으로 보는 Event Driven Architecture 구현 본문

개발

회원가입으로 보는 Event Driven Architecture 구현

제리 . 2023. 1. 7. 01:39

들어가며

최근 event기반 아키텍처에 관한 흥미로운 글을 읽었습니다.

요약하자면 이벤트 기반으로 느슨한 결합을 만드는 방식에 관한 내용을 담고 있는데 실제로 한번 구현을 해보면 재밌을 것 같다는 생각이 들었습니다. 그래서 유저 회원 가입 상황에 대해서 이벤트 기반으로 관심사를 분리하여 느슨한 결합을 만드는 구조를 한번 코드로 구현해 봤습니다.

 

회원 가입 요청이 들어오면

A서비스에서는 회원 가입이 완료되면 회원 가입 알림 메시지와 신규 유저 쿠폰이 발급됩니다.

msa로 user, notification, coupon서비스가 각각 분리되어있다면 다음과 같이 처리할 수 있습니다.

@Transactional
fun createUser(createUserInput: CreateUserInput) {
    val newUser = userRepository.saveUser(createUserInput)
    couponApiService.createNewUserCoupon(newUser)
    notificationApiService.sendNewUserNotification(newUser)
}

먼저, 유저 서비스 DB에 회원 정보를 저장합니다.

그리고 쿠폰 서비스에 신규 유저 쿠폰 발급 요청을 보냅니다.

마지막으로 알림 서비스에 회원가입 알림 요청을 보냅니다.

 

간단해 보이는 이 코드에는 몇 가지 문제점이 있습니다.

 

트랜잭션 롤백

유저 정보를 저장하고, 쿠폰을 발급하고, 알림을 보내는 건 하나의 트랜잭션으로 묶여있습니다.

유저 서비스 DB에 회원 정보를 저장하는 데 성공했지만 회원 가입 알림 전송에 실패하면 어떻게 될까요?

회원 정보 저장이 같이 롤백됩니다. 회원 가입 알림 전송이 실패했다고 회원 가입에 실패하는 건 조금 이상한 것 같습니다.

또한, 외부 서비스가 느려지게 된다면 트랜잭션이 길어지는 문제도 발생할 수 있습니다.

 

외부 도메인 의존성

두 번째 문제는 유저 서비스는 회원 가입 처리를 위해서 신규 쿠폰 발급, 회원가입 알림과 같은 다른 도메인의 서비스를 호출해야 합니다. 이는 유저 서비스에서 다른 도메인에 대한 의존성을 가지게 되어 느슨한 결합에 영향을 주게 됩니다.

 

자, 그러면 코드를 약간 수정해서 외부 요청을 별도의 스레드로 실행해 보겠습니다.

@Transactional
fun createUser(createUserInput: CreateUserInput) {
    val newUser = userRepository.saveUser(createUserInput)
    couponApiService.createNewUserCouponAsync(newUser) // Async
    notificationApiService.sendNewUserNotificationAsync(newUser) // Async
}

별도의 스레드에서 실행하게 되면 트랜잭션의 범위 밖에서 처리되기 때문에 트랜잭션으로 인한 문제가 해결됩니다.

하지만 여전히 회원 가입에 외부 도메인을 호출해야 한다는 문제는 남아있습니다.

더불어 별도의 스레드에서 예외가 발생하는 경우 처리되어야 할 로직이 누락되는 경우가 발생하기 쉽습니다.

 

유저 이벤트 생성

회원가입이 발생하면 메세징 시스템으로 이벤트를 발행하는 구조입니다.

변경된 회원 가입 과정을 보겠습니다.

 

유저 서비스

@Transactional
fun createUser(createUserInput: CreateUserInput) {
    val newUser = userRepository.saveUser(createUserInput)
    publishEvent(newUser)
}

유저 서비스에서 신규 쿠폰 발급, 회원가입 알림을 요청하는 코드가 사라졌습니다.

대신 유저 생성 이벤트를 발행하는 코드가 추가됐습니다.

이제 유저 서비스는 다른 도메인을 신경 쓸 필요 없이 유저를 생성하고 이벤트를 발행하면 됩니다.

 

이벤트 포맷

data class UserEventPayload(
    val id: Long,
    val action: Action
)

enum class Action {
    CREATE, UPDATE
}

이벤트는 특정한 목적을 가지고 메시지를 구성하지 않는 게 좋습니다.

특정한 목적을 가지고 메시지를 구성하게 되면 메시지 변경에 취약합니다.

메시지가 변경되면 발행자와 수신자의 코드가 모두 영향을 받습니다.

따라서 zero payload 방식처럼 최소한의 id정보를 가지고 메시지를 구성하면 결합을 낮춰주는 효과가 있습니다.

 

쿠폰 서비스

@KafkaListener(topics = ["user..."],)
fun userEventListener(records: List<ConsumerRecord<String, String>>) {
    issueNewUserCoupon(records)
}

쿠폰 서비스에서는 유저 이벤트를 수신하여 신규 유저 쿠폰을 발행합니다.

참고로 @KafkaListener는 spring-kafka 모듈에서 제공해주는 어노테이션으로 특정 토픽의 컨슈머로서 동작할 수 있게해줍니다.

 

알림 서비스

@KafkaListener(topics = ["user..."],)
fun userEventListener(records: List<ConsumerRecord<String, String>>) {
    notifyNewUser(records)
}

알림 서비스에서도 유저 이벤트를 수신하여 회원가입 알림을 전송합니다.

 

이벤트 발행 시점(트랜잭션 커밋 전? 후?)

다시 유저 서비스에서 유저 이벤트를 발생시키는 상황으로 돌아오겠습니다.

@Transactional
fun createUser(createUserInput: CreateUserInput) {
    val newUser = userRepository.saveUser(createUserInput)
    publishEvent(newUser)
}

이벤트를 발행하는 상황은 트랜잭션의 어떤 phase에서 처리해야 할까요?

BEFORE_COMMIT? AFTER_COMMIT?

고민을 해보기 전에 스프링에서 제공하는 @TransactionalEventListener를 소개하겠습니다.

@TransactionalEventListener는 트랜잭션 phase에 따라 이벤트를 리스닝할 수 있는 기능을 지원합니다. 이 어노테이션을 적용하면 다음과 같이 코드를 작성할 수 있습니다.

@Transactional
fun createUser(createUserInput: CreateUserInput) {
    val newUser = userRepository.saveUser(createUserInput)
    applicationEventPublisher.publishEvent(newUser)
}

@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
fun publishUserEvent(newUser: UserEventPayload) {
    publishEvent(newUser)
}

혹은 JPA를 사용하신다면 @PrePersist @PostPersist 같은 어노테이션을 사용해서 데이터가 저장되기 전후 동작에 대해서 정의할 수도 있습니다.

 

트랜잭션 커밋 전 이벤트 발행(BEFORE_COMMIT)

그럼 이제 이벤트 발행이 트랜잭션에 묶여있는 상황을 보겠습니다. 

 

뭔가 이상하지 않으신가요? 이벤트 발행은 외부 메시징 시스템과 통신하게 됩니다. 만약 외부 메시징 시스템이 정상적으로 동작하지 않는다면 트랜잭션처리가 느려지거나 실패할 수 있습니다.

가장 처음 외부 도메인의 api를 호출했던 경우의 문제점이 다시 살아났습니다.

 

트랜잭션 커밋 후 이벤트 발행(AFTER_COMMIT)

이번에는 트랜잭션 이후 이벤트를 발행하는 상황에 대해서 보겠습니다.(AFTER_COMMIT)

트랜잭션 범위 밖에서 이벤트를 발행하므로 외부 시스템의 영향도와 트랜잭션이 분리되었습니다.

하지만 회원 가입 트랜잭션이 성공한다고 해서 반드시 이벤트를 발행한다는 보장이 사라지게 됐습니다.

 

이벤트 발행 구조

우리의 목표는 메시징 시스템으로 이벤트 발행이 트랜잭션에 영향을 주지 않으면서 발행을 보장하는 것입니다.

따라서, 유저 이벤트 자체를 저장하는 과정을 트랜잭션에 포함시켜 보겠습니다.

유저 이벤트는 유저 저장과 같이 RDS에서 저장한다면 트랜잭션으로 쉽게 묶어 처리할 수 있습니다.

또한, 유저 이벤트 테이블에 이벤트 발행 여부에 대한 flag를 추가하면 발행이 정상적으로 이뤄졌는지 확인할 수 있습니다.

이벤트 발행은 이제 2단계로 구성됩니다.

 

내부 이벤트 발행

첫 번째는 회원 가입 트랜잭션 내부에서는 유저 이벤트 저장을 위한 이벤트를 발행합니다.

@Transactional
fun createUser(createUserInput: CreateUserInput) {
    val newUser = userRepository.saveUser(createUserInput)
    applicationEventPublisher.publishEvent(newUser)
}

@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
fun createUserEvent(newUser: UserEventPayload) {
    userEventRepository.saveUserEvent(newUser)
}

트랜잭션 리스너를 BEFORE_COMMIT단계로 지정하여 유저 이벤트 저장을 트랜잭션 범위에 포함시켰습니다.

새로 저장되는 유저 이벤트는 isPublished=false상태로 저장됩니다.

이제 회원가입 성공 시 유저 이벤트가 저장되는 것을 보장할 수 있습니다.

 

외부 이벤트 발행

두 번째로 외부 메시징 시스템으로 이벤트를 발행하는 단계를 보겠습니다.

@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
fun createUserEvent(newUser: UserEventPayload) {
    val newUserEvent = userEventRepository.saveUserEvent(newUser)
    applicationEventPublisher.publishEvent(newUserEvent)
}

@Async
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
fun publishUserEvent(newUserEvent: ExternalUserEventPayload) {
    publishEvent(newUserEvent)
    updateUserEventPublishTrue(newUserEvent)
}

유저 이벤트를 저장하는 로직에 이벤트 발행을 추가하였습니다.

이 이벤트는 AFTER_COMMIT단계에서 이뤄지고 별도의 스레드로 동작하므로 기존의 트랜잭션에 영향을 주지 않습니다.

성공적으로 외부 시스템에 이벤트를 발행하면 유저 이벤트의 isPublished = true로 업데이트됩니다.

이것으로 회원가입 시 유저 이벤트를 반드시 저장하면서 외부 메시징 시스템으로 이벤트 발행은 트랜잭션과 분리될 수 있는 구조가 완성됐습니다.

발행되지 않은 이벤트 재발행 배치

유저 이벤트가 저장된다고 해서 유저 이벤트가 외부 메시징 시스템으로 반드시 발행된다는 것을 보장하지는 않습니다. 따라서 유저 이벤트의 isPublished=false상태를 추적해 주기적으로 이벤트를 다시 발급시키는 배치를 구성할 수 있습니다.

@Bean
fun publishUserEventJob(): Job {
    return jobBuilderFactory["publishUserEventJob"]
        .start(publishUserEventStep())
        .build()
}

fun publishUserEventStep(): Step {
        return stepBuilderFactory["publishUserEventStep"]
            .tasklet { _, _ ->
                val userEventList = userEventService.findNotPublishUserEvent()
                if (userEventList.isNotEmpty()) {
                    userEventList.forEach { userEvent ->
                        userEventService.publishUserEvent(userEvent)
                    }
                    return@tasklet RepeatStatus.CONTINUABLE
                }
                return@tasklet RepeatStatus.FINISHED
            }
            .build()
    }

스프링 배치에서 처리할 때는 다소 주의사항이 있습니다.

스프링 배치는 별도로 데이터 소스를 설정하지 않으면 사용하는 데이터 소스를 공유합니다.

이러면 tasklet에서 처리되는 로직은 스프링 배치의 트랜잭션 하위에서 실행되게 됩니다.

만약, 여러 이벤트를 한 번에 트랜잭션에서 발행한다면 롤백에 주의해야 합니다.

외부 시스템에 이벤트 발행에 성공했지만 isPublished=false로 롤백될 수 있습니다.

만약, 이벤트 발행마다 트랜잭션을 별도로 처리하고 싶다면 트랜잭션의 전파 속성을 변경해주면 됩니다.

@Transactional(propagation = Propagation.REQUIRES_NEW)
fun updateUserEventPublishTrue(..) {
	...
}

별도로 옵션을 설정하지 않으면 부모 트랜잭션에 포함되기 때문에 스프링 배치의 트랜잭션 범위에 포함되지만

REQUIRES_NEW 옵션을 주게 되면 매번 새로운 트랜잭션을 생성하여 처리합니다.

한 건씩 읽어서 이벤트를 읽고 처리하는 방법도 있지만, 그렇게 되면 배치 테이블이 업데이트되는 단위가 짧아져 처리시간이 더 길어질 수 있습니다.

 

마치며

가장 처음에 소개드린 글에 상당히 자세하게 관련 내용에 대한 설명이 나와있어서 어렵지 않게 구현해볼 수 있었습니다.

주로 이벤트를 소비하는 주체로서는 자주 개발을 했던 것 같은데 이벤트를 발행하는 주체로서는 몰랐던 부분이 많았던 것 같습니다.

이벤트 발행에 대해서 좋은 내용을 익힐 수 있는 경험이었고, 정확한 이벤트 발행이 필요한 기회가 있다면 이러한 방식을 팀 내에서도 한번 적용해보면 좋을 것 같다는 생각이 들었습니다. 구현 코드는 여기서 확인하실 수 있습니다.

Comments