추후 이 DLT를 보고 개발자가 재시도해서
더 걸러내어 실제 어떤 문제가 있었는지 파악 할 수가 있어짐

수동 커밋의 예외처리
에러 핸들러로 메세지 제어권이 넘어가고 일정 재시도 횟수를 넘어가면 레코드를 포기하고 (Recover) Offset 을 Commit
전체적인 흐름
[1] Kafka Consumer 스레드가 메시지를 poll()로 가져움
↓
[2] @KafkaListener 메서드 호출
↓
[3] 비즈니스 로직 실행 중 예외 발생
↓
[4] Error Handler(DefaultErrorHandler)로 제어권 이동
↓
[5] Error Handler가 결정
- 재시도할지?
- 잠깐 기다렸다가 다시 시도할지(BackOff)?
- 더 이상 안 되겠다 → 포기(recover)할지?
- 포기한다면 DLT로 보낼지?
↓
[6] 최종적으로
- 재시도 중이면 → 같은 메시지 다시 리스너로 전달
- 포기라면 → Offset Commit + DLT 전송(옵션)
재시도, 포기 중의 제어처리
전체적인 흐름의 4,5,6 번의 처리에 대하여
기본 DefaultErrorHandler
동일 메세지에 대해 10번 재시도 (0ms 로 10번 재시도) 그래도 실패시 레코드를 포기하고 Commit
Dead Letter Topic (DLT)
재시도 하면 언젠가 성공하는 메세지
일정 시간 장애 발생
잠시 API 가 내려간 경우
재시도 해도 성공할 수 없는 메세지
잘못된 JSON 으로 온 메세지
이미 삭제 처리된 경우
존재하지 않는 요소로 인한 오류의 경우
이 2가지에 대해 구분 처리가 필요한데, 이 때 실패한 메세지들을 따로 모아두는 Topic이 있고
이 Topic 을 Dead Letter Topic 이라고 한다
DLT 의 토픽 이름 예시
DeadLetterPublishingRecover 라는 Spring Kafka 의 기본 설정을 사용시
보통 원본 토픽이름-dlt 라는 네이밍으로 DLT 의 이름이 붙는다
구현 코드
Kafka 설정부에 추가
// Kafka 설정부 (KafkaConfig)
@Bean
public ConsumerFactory<String, String> errorDemoConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "error-demo-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(props);
}
// DefaultErrorHandler 커스텀 + DLT
@Bean
public CommonErrorHandler kafkaErrorHandlerWithDLT() {
// 1초 간격으로 최대 2번 추가 재시도 (총 3번 시도)
FixedBackOff backOff = new FixedBackOff(1000L, 2L);
// DLT 보내기
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(stringKafkaTemplate());
return new DefaultErrorHandler(deadLetterPublishingRecoverer, backOff);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> errorDemoWithDLTKafkaListenerContainerFactory(CommonErrorHandler kafkaErrorHandlerWithDLT) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(errorDemoConsumerFactory());
factory.setCommonErrorHandler(kafkaErrorHandlerWithDLT);
return factory;
}
Producer
@Service
@RequiredArgsConstructor
public class ErrDemoProducer {
private static final String ERR_DEMO_TOPIC = "error-demo";
private final KafkaTemplate<String, String> stringKafkaTemplate;
public void send(String message) {
stringKafkaTemplate.send(ERR_DEMO_TOPIC, message);
}
}
Listener
// 최종적으로 실패한 메세지는 DLT 로
@KafkaListener(
topics = "error-demo"
, groupId = "error-demo-group"
, containerFactory = "errorDemoWithDLTKafkaListenerContainerFactory"
)
public void consume(String message) {
log.info("[error-demo] receive Msg : {}", message);
if (message.contains("error")) {
log.info("error : {}", message);
throw new RuntimeException("테스트용 예외");
}
log.info("complete : {}", message);
}
API Controller
@RestController
@RequestMapping("/api/simple/error-demo")
@RequiredArgsConstructor
public class ErrDemoController {
private final ErrDemoProducer errDemoProducer;
@PostMapping
public ResponseEntity<Void> send(@RequestBody String message) {
errDemoProducer.send(message);
return ResponseEntity.ok().build();
}
}
실행 결과
Console
설정한대로 3번 재시도 + DLT 전송
Kafka-UI
토픽이름-dlt 로 Topic이 자동 생성되고 그 안에 에러 메세지들이 담겨 있음
💡
Share article