Consumer 가 Partition 별로 다르게 붙어서 사진은 Offset이 똑같지만,
사실 별개대로 읽혀도 상관이 없음, 독립적이기 때문

Offset
각 파티션에 메세지에 붙는 번호
Consumer Group 단위로 관리
어디까지 읽었는가에 대한 판단은 Consumer Group + Partition 조합으로 구분
→ Consumer Group 이 다르면 독립
💡
Commit
Offset 을 저장하는 것
Kafka 에 이 Offset 까지 읽었다고 기록하는 것
Kafka 는 기본적으로 at-least-once 전달 방식으로,
한번은 무조건 전달한다 라는 규칙을 지킴
너무 빨리 처리되어 유실,
너무 늦게 처리되어 중복처리 될 수 있지만 무조건 한번은 전달함기본적으로 Commit은 자동으로 처리
Kafka 는 @KafkaListener 뒤에서 Listener Conatiner 가 돌며 메세지를 전달하고 Offset 을 Commit 함
수동 Commit
비즈니스 로직이 중간에 첨부 되어 있을 경우 메세지가 전달되면 처리하고 나서 Commit 을 해야함
처리 과정 중 오류가 나면 Commit 처리되면 안되기 때문에 (재처리가 필요함)
처리 과정이 끝나고 DB 저장과 Offset 의 Commit 을 맞춰야하기 때문에
수동 Commit 구현
// Kafka 설정부 (Kafka Config)
// 수동 Offset Commit
@Bean
public ConsumerFactory<String, String> manualConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "simple-manual-ack-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> manualKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(manualConsumerFactory());
// 수동으로 Offset Commit
// 자동으로 Offset Commit 설정 부터 무효화
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
수동 커밋을 위한 Consumer Listener 등록
@Slf4j
@Component
public class ManualAckConsumerListener {
@KafkaListener(
topics = "simple-messages"
, groupId = "simple-manual-ack-group"
, containerFactory = "manualKafkaListenerContainerFactory"
)
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
String message = record.value(); // 이 과정도 Kafka 가 자동으로 처리해주고 있음
log.info("수동 커밋 모드, 받은 메세지 : {}", message);
if(message.equals("error")) {
throw new RuntimeException("kafka error");
}
// 수동 커밋 처리
ack.acknowledge();
// 다만 실패 했다고 그 offset 을 건너뛰고 처리하는게 아님
// 실패시 kafka 는 에러 핸들러로 처리하도록 offset 을 핸들러에게 처리하도록 넘김
// 일정 시간동안 일정 횟수를 재시도
// 그래도 실패시 그 offset 을 커밋 처리하고 건너 뛰어버림
// 그래서 처음 시도 후 재시도 횟수가 끝나면 current와 end의 offset이 동일
}
}
주석에도 써놨지만, 실제로 error 라는 message 를 담아 전송하면
첫 사진부터 postman 전송, console 로그들
예외 발생
→ Kafka 는 에러 핸들러에게 메세지 처리를 넘김
→ 일정 시간 동안 일정 횟수 재시도
→ 그래도 실패시 강제 Commit 후 처리 포기 or 처리 성공
의 처리를 진행하여, 포기하는 순간에 Offset 을 커밋 처리할 수 있음
그래서 결과적으로는 재처리 중에는 Offset 이 차이가 나더라도 결국엔 같아지도록 Kafka 가 처리 함 (실제 처리는 안되고 포기한 것)
Share article