inblog logo
|
LifeLog, DevLog
    Kafka

    Kafka 의 예외처리

    KYJTHEYJ's avatar
    KYJTHEYJ
    Apr 01, 2026
    Kafka 의 예외처리
    Contents
    수동 커밋의 예외처리전체적인 흐름재시도, 포기 중의 제어처리기본 DefaultErrorHandlerDead Letter Topic (DLT)DLT 의 토픽 이름 예시구현 코드실행 결과

    수동 커밋의 예외처리

    • 에러 핸들러로 메세지 제어권이 넘어가고 일정 재시도 횟수를 넘어가면 레코드를 포기하고 (Recover) Offset 을 Commit

    Kafka 에러 처리 공식문서

    전체적인 흐름

    [1] Kafka Consumer 스레드가 메시지를 poll()로 가져움
      ↓
    [2] @KafkaListener 메서드 호출
      ↓
    [3] 비즈니스 로직 실행 중 예외 발생
      ↓
    [4] Error Handler(DefaultErrorHandler)로 제어권 이동
      ↓
    [5] Error Handler가 결정
        - 재시도할지?
        - 잠깐 기다렸다가 다시 시도할지(BackOff)?
        - 더 이상 안 되겠다 → 포기(recover)할지?
        - 포기한다면 DLT로 보낼지?
      ↓
    [6] 최종적으로
        - 재시도 중이면 → 같은 메시지 다시 리스너로 전달
        - 포기라면 → Offset Commit + DLT 전송(옵션)
    

    재시도, 포기 중의 제어처리

    전체적인 흐름의 4,5,6 번의 처리에 대하여

    기본 DefaultErrorHandler

    • 동일 메세지에 대해 10번 재시도 (0ms 로 10번 재시도) 그래도 실패시 레코드를 포기하고 Commit

    Dead Letter Topic (DLT)

    • 재시도 하면 언젠가 성공하는 메세지

      • 일정 시간 장애 발생

      • 잠시 API 가 내려간 경우

    • 재시도 해도 성공할 수 없는 메세지

      • 잘못된 JSON 으로 온 메세지

      • 이미 삭제 처리된 경우

      • 존재하지 않는 요소로 인한 오류의 경우

    이 2가지에 대해 구분 처리가 필요한데, 이 때 실패한 메세지들을 따로 모아두는 Topic이 있고

    이 Topic 을 Dead Letter Topic 이라고 한다

    DLT 의 토픽 이름 예시

    DeadLetterPublishingRecover 라는 Spring Kafka 의 기본 설정을 사용시

    보통 원본 토픽이름-dlt 라는 네이밍으로 DLT 의 이름이 붙는다

    구현 코드

    • Kafka 설정부에 추가

    // Kafka 설정부 (KafkaConfig)
    @Bean
    public ConsumerFactory<String, String> errorDemoConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
    
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "error-demo-group");
    
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    
        return new DefaultKafkaConsumerFactory<>(props);
    }
    
    // DefaultErrorHandler 커스텀 + DLT
    @Bean
    public CommonErrorHandler kafkaErrorHandlerWithDLT() {
        // 1초 간격으로 최대 2번 추가 재시도 (총 3번 시도)
        FixedBackOff backOff = new FixedBackOff(1000L, 2L);
    
        // DLT 보내기
        DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(stringKafkaTemplate());
    
        return new DefaultErrorHandler(deadLetterPublishingRecoverer, backOff);
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> errorDemoWithDLTKafkaListenerContainerFactory(CommonErrorHandler kafkaErrorHandlerWithDLT) {
    
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    
        factory.setConsumerFactory(errorDemoConsumerFactory());
        factory.setCommonErrorHandler(kafkaErrorHandlerWithDLT);
        return factory;
    }
    
    • Producer

    @Service
    @RequiredArgsConstructor
    public class ErrDemoProducer {
        private static final String ERR_DEMO_TOPIC = "error-demo";
    
        private final KafkaTemplate<String, String> stringKafkaTemplate;
    
        public void send(String message) {
            stringKafkaTemplate.send(ERR_DEMO_TOPIC, message);
        }
    }
    
    • Listener

    // 최종적으로 실패한 메세지는 DLT 로
    @KafkaListener(
            topics = "error-demo"
            , groupId = "error-demo-group"
            , containerFactory = "errorDemoWithDLTKafkaListenerContainerFactory"
    )
    public void consume(String message) {
        log.info("[error-demo] receive Msg : {}", message);
    
        if (message.contains("error")) {
            log.info("error : {}", message);
            throw new RuntimeException("테스트용 예외");
        }
    
        log.info("complete : {}", message);
    }
    
    • API Controller

    @RestController
    @RequestMapping("/api/simple/error-demo")
    @RequiredArgsConstructor
    public class ErrDemoController {
        private final ErrDemoProducer errDemoProducer;
    
        @PostMapping
        public ResponseEntity<Void> send(@RequestBody String message) {
            errDemoProducer.send(message);
            return ResponseEntity.ok().build();
        }
    }
    

    실행 결과

    • Console

      • 설정한대로 3번 재시도 + DLT 전송

    • Kafka-UI

      • 토픽이름-dlt 로 Topic이 자동 생성되고 그 안에 에러 메세지들이 담겨 있음

    💡

    추후 이 DLT를 보고 개발자가 재시도해서
    더 걸러내어 실제 어떤 문제가 있었는지 파악 할 수가 있어짐

    Share article
    Contents
    수동 커밋의 예외처리전체적인 흐름재시도, 포기 중의 제어처리기본 DefaultErrorHandlerDead Letter Topic (DLT)DLT 의 토픽 이름 예시구현 코드실행 결과

    LifeLog, DevLog - https://github.com/KYJTHEYJ

    RSS·Powered by Inblog