inblog logo
|
LifeLog, DevLog
    Kafka

    Commit

    KYJTHEYJ's avatar
    KYJTHEYJ
    Apr 01, 2026
    Commit
    Contents
    OffsetCommit수동 Commit수동 Commit 구현

    Offset

    각 파티션에 메세지에 붙는 번호

    Consumer Group 단위로 관리

    어디까지 읽었는가에 대한 판단은 Consumer Group + Partition 조합으로 구분
    → Consumer Group 이 다르면 독립

    💡

    Consumer 가 Partition 별로 다르게 붙어서 사진은 Offset이 똑같지만,
    사실 별개대로 읽혀도 상관이 없음, 독립적이기 때문

    Commit

    Offset 을 저장하는 것

    Kafka 에 이 Offset 까지 읽었다고 기록하는 것

    Kafka 는 기본적으로 at-least-once 전달 방식으로,
    한번은 무조건 전달한다 라는 규칙을 지킴

    • 너무 빨리 처리되어 유실,
      너무 늦게 처리되어 중복처리 될 수 있지만 무조건 한번은 전달함

    • 기본적으로 Commit은 자동으로 처리

      • Kafka 는 @KafkaListener 뒤에서 Listener Conatiner 가 돌며 메세지를 전달하고 Offset 을 Commit 함

    수동 Commit

    비즈니스 로직이 중간에 첨부 되어 있을 경우 메세지가 전달되면 처리하고 나서 Commit 을 해야함

    • 처리 과정 중 오류가 나면 Commit 처리되면 안되기 때문에 (재처리가 필요함)

    • 처리 과정이 끝나고 DB 저장과 Offset 의 Commit 을 맞춰야하기 때문에

    수동 Commit 구현

     // Kafka 설정부 (Kafka Config)
     // 수동 Offset Commit
      @Bean
      public ConsumerFactory<String, String> manualConsumerFactory() {
          Map<String, Object> props = new HashMap<>();
          props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
          props.put(ConsumerConfig.GROUP_ID_CONFIG, "simple-manual-ack-group");
          props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
          props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
          props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    
          return new DefaultKafkaConsumerFactory<>(props);
      }
    
      @Bean
      public ConcurrentKafkaListenerContainerFactory<String, String> manualKafkaListenerContainerFactory() {
          ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
          factory.setConsumerFactory(manualConsumerFactory());
    
          // 수동으로 Offset Commit
          // 자동으로 Offset Commit 설정 부터 무효화
          factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
    
          return factory;
      }
    
    • 수동 커밋을 위한 Consumer Listener 등록

    @Slf4j
    @Component
    public class ManualAckConsumerListener {
        @KafkaListener(
                topics = "simple-messages"
                , groupId = "simple-manual-ack-group"
                , containerFactory = "manualKafkaListenerContainerFactory"
        )
        public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
            String message = record.value(); // 이 과정도 Kafka 가 자동으로 처리해주고 있음
            
            log.info("수동 커밋 모드, 받은 메세지 : {}", message);
    
            if(message.equals("error")) {
                throw new RuntimeException("kafka error");
            }
    
            // 수동 커밋 처리
            ack.acknowledge();
    
            // 다만 실패 했다고 그 offset 을 건너뛰고 처리하는게 아님
            // 실패시 kafka 는 에러 핸들러로 처리하도록 offset 을 핸들러에게 처리하도록 넘김
            // 일정 시간동안 일정 횟수를 재시도
            // 그래도 실패시 그 offset 을 커밋 처리하고 건너 뛰어버림
            // 그래서 처음 시도 후 재시도 횟수가 끝나면 current와 end의 offset이 동일
        }
    }
    

    주석에도 써놨지만, 실제로 error 라는 message 를 담아 전송하면

    첫 사진부터 postman 전송, console 로그들

    예외 발생 
    → Kafka 는 에러 핸들러에게 메세지 처리를 넘김 
    → 일정 시간 동안 일정 횟수 재시도 
    → 그래도 실패시 강제 Commit 후 처리 포기 or 처리 성공
    

    의 처리를 진행하여, 포기하는 순간에 Offset 을 커밋 처리할 수 있음

    그래서 결과적으로는 재처리 중에는 Offset 이 차이가 나더라도 결국엔 같아지도록 Kafka 가 처리 함 (실제 처리는 안되고 포기한 것)

    Share article
    Contents
    OffsetCommit수동 Commit수동 Commit 구현

    LifeLog, DevLog - https://github.com/KYJTHEYJ

    RSS·Powered by Inblog