
Spring Boot + Kafka
스프링부트 환경에서 Kafka 사용
Gradle
dependencies {
implementation 'org.springframework.kafka:spring-kafka'
}
application.yml
spring:
kafka:
bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
docker-compose.yml
services:
# --------------------------
# Broker 1 (Node ID: 1)
# --------------------------
kafka-1:
image: apache/kafka:3.7.0
container_name: kafka-1
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
# listeners
KAFKA_LISTENERS: CONTROLLER://:9093,INTERNAL://:29092,EXTERNAL://:9092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
# replication configs
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_LOG_DIRS: /var/lib/kafka/data
volumes:
- kafka1_data:/var/lib/kafka/data
networks:
- kafka-net
# --------------------------
# Broker 2 (Node ID: 2)
# --------------------------
kafka-2:
image: apache/kafka:3.7.0
container_name: kafka-2
ports:
- "9093:9092"
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
# listeners
KAFKA_LISTENERS: CONTROLLER://:9093,INTERNAL://:29092,EXTERNAL://:9092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:29092,EXTERNAL://localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
# replication configs
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_LOG_DIRS: /var/lib/kafka/data
volumes:
- kafka2_data:/var/lib/kafka/data
networks:
- kafka-net
# --------------------------
# Broker 3 (Node ID: 3)
# --------------------------
kafka-3:
image: apache/kafka:3.7.0
container_name: kafka-3
ports:
- "9094:9092"
environment:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
# listeners
KAFKA_LISTENERS: CONTROLLER://:9093,INTERNAL://:29092,EXTERNAL://:9092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:29092,EXTERNAL://localhost:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
# replication configs
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_LOG_DIRS: /var/lib/kafka/data
volumes:
- kafka3_data:/var/lib/kafka/data
networks:
- kafka-net
# --------------------------
# Kafka UI
# --------------------------
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "8088:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local-kraft-cluster
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-1:29092,kafka-2:29092,kafka-3:29092
KAFKA_CLUSTERS_0_METADATAPROVIDER: "KAFKA"
DYNAMIC_CONFIG_ENABLED: true
depends_on:
- kafka-1
- kafka-2
- kafka-3
networks:
- kafka-net
volumes:
kafka1_data:
kafka2_data:
kafka3_data:
networks:
kafka-net:
driver: bridge
KafkaConfig
Producer 와 Consumer 설정
producer
ProducerFactory<String, String>Producer가 사용할 설정 정보를 선언
key와 value를 모두 문자열로 직렬화하기 위해
StringSerializer를 사용 (Apache Kafka)
KafkaTemplate<String, String>Producer 대신 실제로
send()를 호출하는 역할Kafka와 Spring boot가 통신하는 객체
Consumer
ConsumerFactory<String, String>Consumer가 어떤 방식으로 메시지를 읽을지 설정
key와 value를 문자열로 역직렬화하기 위해
StringDeserializer를 사용GROUP_ID_CONFIG = "simple-group"→ 이 Consumer가 속한 그룹 이름
AUTO_OFFSET_RESET_CONFIG = "earliest"
→ 처음 실행 시, 토픽에 쌓여 있던 예전 메시지까지 모두 읽도록
AUTO_OFFSET_RESET_CONFIG = "latest"
→ 처음 실행 시, 토픽에 최신 메세지부터 읽도록
ConcurrentKafkaListenerContainerFactory<String, String>@KafkaListener가 내부적으로 사용할 Container를 생성해 주는 FactorycontainerFactory이름으로@KafkaListener에 연결해서 사용
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@EnableKafka
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
// Producer 설정 (String, String)
@Bean
public ProducerFactory<String, String> stringProducerFactory() {
Map<String, Object> props = new HashMap<>();
// Kafka 서버 위치
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// key, value를 문자열로 직렬화
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 균등 분배 (Round Robin)
// 설정하지 않을 경우 1개의 Partition 에만 key가 없는 메세지를 batch 가 찰 때 까지 보내는 것이 Default (Sticky Partitioner)
// 키가 있으면 Sticky Partitioner 는 작동하지 않음
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class.getName());
return new DefaultKafkaProducerFactory<>(props);
}
// Producer가 사용할 KafkaTemplate
@Bean
public KafkaTemplate<String, String> stringKafkaTemplate() {
return new KafkaTemplate<>(stringProducerFactory());
}
// Consumer 설정 (String, String)
@Bean
public ConsumerFactory<String, String> stringConsumerFactory() {
Map<String, Object> props = new HashMap<>();
// Kafka 서버 위치
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 이 Consumer가 속할 그룹 아이디
props.put(ConsumerConfig.GROUP_ID_CONFIG, "simple-group");
// key, value를 문자열로 역직렬화
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 새로운 Consumer 로 처음 시작할 때, 토픽 맨 앞부터 읽도록 설정 - earliest
// 새로운 Consumer 로 처음 시작할 때, 토픽 맨 뒤부터 (최신 메세지부터) 읽도록 설정 - latest
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props);
}
// @KafkaListener가 사용할 ListenerContainerFactory
// ConsumerFactory를 만들어 주고 이를 감지하는 Listener
// 값이 들어온 것을 인지
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> stringKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
// 만든 consumer 등록
factory.setConsumerFactory(stringConsumerFactory());
return factory;
}
// String, custom Object 형태 처리
// Boot 4 버전 부터는 JsonDeserializer 가 Deprecated, 직접 서비스단에서 우리가 변환해줘야 함
// 직렬화 (Producer)
// Object -> objectMapper.writeValueAsString() -> String -> Kafka
// 역직렬화 (Consumer)
// Kafka -> String -> objectMapper.readValue() -> Object
// 직접 명시하여 형변환 하는 것이 차이
}
Boot 3.x 의 경우의 커스텀 객체 처리 포함 설정부
// Boot 3.x 일 경우
// Producer 설정 (String, SimpleEvent) - JSON 직렬화
@Bean
public ProducerFactory<String, SimpleEvent> eventProducerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, SimpleEvent> eventKafkaTemplate() {
return new KafkaTemplate<>(eventProducerFactory());
}
// Consumer 설정 (String, SimpleEvent) - JSON 역직렬화
@Bean
public ConsumerFactory<String, SimpleEvent> eventConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "simple-event-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// SimpleEvent 타입을 역직렬화하기 위한 JsonDeserializer 생성
JsonDeserializer<SimpleEvent> deserializer = new JsonDeserializer<>(SimpleEvent.class);
// 역직렬화를 허용할 패키지 지정 (보안 및 타입 안전성)
deserializer.addTrustedPackages("com.example.kafka.event");
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
deserializer
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, SimpleEvent> eventKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, SimpleEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(eventConsumerFactory());
return factory;
}
Listener, Producer, Controller 클래스
단순 String 전달의 경우
Producer
@Service
@RequiredArgsConstructor
public class SimpleMessageProducer {
// 상수로 Topic 이름을 관리하는 것이 좋음
private static final String TOPIC_SIMPLE = "simple-messages";
// kafka Config에서 설정한 KafkaTemplate<String, String> 설정 불러오기
private final KafkaTemplate<String, String> stringKafkaTemplate;
// 단순히 지정된 Topic 으로 메세지를 제공
public void send(String message) {
stringKafkaTemplate.send(TOPIC_SIMPLE, message);
}
}
Listener
@Slf4j
@Component
public class SimpleMessageListener {
// 메세지를 보내기 위해서는 어디서 어디로 보내줄 것인지 지정해야함
@KafkaListener(
topics = "simple-messages", // simple-messages Topic 구독
groupId = "simple-group", // Config에서 사용한 그룹과 일치시켜야함
containerFactory = "stringKafkaListenerContainerFactory" // Config 에서 만든 설정을 이용
)
public void consume(String message) {
log.info("받은 메시지: {}", message);
}
// 위 코드대로만 하면 simple-group 이라는 Consumer Group 에 1개의 Consumer 가 붙은 구조
// 1개의 Consumer 가 3개의 Partition 관리
// 3개 까지 Consumer 를 늘려 Partition 수 대로 지정
@KafkaListener(
topics = "simple-messages", // simple-messages Topic 구독
groupId = "simple-group", // Config에서 사용한 그룹과 일치시켜야함
containerFactory = "stringKafkaListenerContainerFactory" // Config 에서 만든 설정을 이용
)
public void consume_2(String message) {
log.info("받은 메시지: {}", message);
}
@KafkaListener(
topics = "simple-messages", // simple-messages Topic 구독
groupId = "simple-group", // Config에서 사용한 그룹과 일치시켜야함
containerFactory = "stringKafkaListenerContainerFactory" // Config 에서 만든 설정을 이용
)
public void consume_3(String message) {
log.info("받은 메시지: {}", message);
}
}
DTO 를 통한 Json
Producer
@Service
@RequiredArgsConstructor
public class SimpleEventProducer {
private final ObjectMapper objectMapper;
private static final String TOPIC_EVENT = "simple-event";
private final KafkaTemplate<String, String> stringKafkaTemplate;
// Boot 4 부터는 직접 형변환 명시
public void send(SimpleEvent simpleEvent) {
stringKafkaTemplate.send(TOPIC_EVENT, objectMapper.writeValueAsString(simpleEvent));
}
}
Boot 3.x Producer
@Service
@RequiredArgsConstructor
public class SimpleEventProducer {
private static final String TOPIC = "simple-event";
private final KafkaTemplate<String, SimpleEvent> eventKafkaTemplate;
public void send(SimpleEvent event) {
eventKafkaTemplate.send(TOPIC, event);
}
}
Listener
@Slf4j
@Component
@RequiredArgsConstructor
public class SimpleEventListener {
private final ObjectMapper objectMapper;
// 메세지를 보내기 위해서는 어디서 어디로 보내줄 것인지 지정해야함
@KafkaListener(
topics = "simple-event", // simple-messages Topic 구독
groupId = "simple-group", // Config에서 사용한 그룹과 일치시켜야함
containerFactory = "stringKafkaListenerContainerFactory" // Config 에서 만든 설정을 이용
)
// Boot 4 버전 부터는 직접 형변환
public void consume(String message) {
SimpleEvent event = objectMapper.readValue(message, SimpleEvent.class);
log.info("받은 이벤트: {}", event.toString());
}
}
Boot 3.x Listener
@Slf4j
@Component
public class SimpleEventListener {
@KafkaListener(
topics = "simple-event",
groupId = "simple-event-group",
containerFactory = "eventKafkaListenerContainerFactory"
)
public void consume(SimpleEvent event) {
log.info("받은 이벤트: {}", event);
}
}
DTO
@Getter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class SimpleEvent {
// 실제 메시지 내용
private String message;
// 작업자
private String worker;
// 이벤트 생성 시각
private LocalDateTime createdAt;
public SimpleEvent(String message) {
this.message = message;
this.worker = "tester";
this.createdAt = LocalDateTime.now();
}
}
Controller
@RestController
@RequestMapping("/api/simple")
@RequiredArgsConstructor
public class SimpleController {
private final SimpleMessageProducer simpleMessageProducer;
private final SimpleEventProducer simpleEventProducer;
@PostMapping("/messages")
public ResponseEntity<Void> sendSimpleMessage(@RequestBody SimpleSendRequest request) {
simpleMessageProducer.send(request.message());
return ResponseEntity.status(HttpStatus.CREATED).build();
}
@PostMapping("/events")
public ResponseEntity<Void> sendSimpleEvent(@RequestBody SimpleSendRequest request) {
simpleEventProducer.send(new SimpleEvent(request.message()));
return ResponseEntity.status(HttpStatus.CREATED).build();
}
결과
Boot 4 버전으로 실행 결과
DTO 를 통한 JSON 의 경우