inblog logo
|
LifeLog, DevLog
    Kafka

    Spring Boot + Kafka

    KYJTHEYJ's avatar
    KYJTHEYJ
    Apr 01, 2026
    Spring Boot + Kafka
    Contents
    Spring Boot + KafkaGradleapplication.ymldocker-compose.ymlKafkaConfigProducer 와 Consumer 설정producerConsumerBoot 3.x 의 경우의 커스텀 객체 처리 포함 설정부Listener, Producer, Controller 클래스단순 String 전달의 경우DTO 를 통한 Json결과

    Spring Boot + Kafka

    스프링부트 환경에서 Kafka 사용

    Gradle

    dependencies {
        implementation 'org.springframework.kafka:spring-kafka'
    }
    

    application.yml

    spring:
      kafka:
        bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
    

    docker-compose.yml

    services:
      # --------------------------
      # Broker 1 (Node ID: 1)
      # --------------------------
      kafka-1:
        image: apache/kafka:3.7.0
        container_name: kafka-1
        ports:
          - "9092:9092"
        environment:
          KAFKA_NODE_ID: 1
          KAFKA_PROCESS_ROLES: broker,controller
          KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
    
          # listeners
          KAFKA_LISTENERS: CONTROLLER://:9093,INTERNAL://:29092,EXTERNAL://:9092
          KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
          KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
          KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
    
          # replication configs
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
          KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
          KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
    
          KAFKA_LOG_DIRS: /var/lib/kafka/data
        volumes:
          - kafka1_data:/var/lib/kafka/data
        networks:
          - kafka-net
    
      # --------------------------
      # Broker 2 (Node ID: 2)
      # --------------------------
      kafka-2:
        image: apache/kafka:3.7.0
        container_name: kafka-2
        ports:
          - "9093:9092"
        environment:
          KAFKA_NODE_ID: 2
          KAFKA_PROCESS_ROLES: broker,controller
          KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
    
          # listeners
          KAFKA_LISTENERS: CONTROLLER://:9093,INTERNAL://:29092,EXTERNAL://:9092
          KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:29092,EXTERNAL://localhost:9093
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
          KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
          KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
    
          # replication configs
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
          KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
          KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
    
          KAFKA_LOG_DIRS: /var/lib/kafka/data
        volumes:
          - kafka2_data:/var/lib/kafka/data
        networks:
          - kafka-net
    
      # --------------------------
      # Broker 3 (Node ID: 3)
      # --------------------------
      kafka-3:
        image: apache/kafka:3.7.0
        container_name: kafka-3
        ports:
          - "9094:9092"
        environment:
          KAFKA_NODE_ID: 3
          KAFKA_PROCESS_ROLES: broker,controller
          KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
    
          # listeners
          KAFKA_LISTENERS: CONTROLLER://:9093,INTERNAL://:29092,EXTERNAL://:9092
          KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:29092,EXTERNAL://localhost:9094
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
          KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
          KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
    
          # replication configs
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
          KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
          KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
    
          KAFKA_LOG_DIRS: /var/lib/kafka/data
        volumes:
          - kafka3_data:/var/lib/kafka/data
        networks:
          - kafka-net
    
      # --------------------------
      # Kafka UI
      # --------------------------
      kafka-ui:
        image: provectuslabs/kafka-ui:latest
        container_name: kafka-ui
        ports:
          - "8088:8080"
        environment:
          KAFKA_CLUSTERS_0_NAME: local-kraft-cluster
          KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-1:29092,kafka-2:29092,kafka-3:29092
          KAFKA_CLUSTERS_0_METADATAPROVIDER: "KAFKA"
          DYNAMIC_CONFIG_ENABLED: true
        depends_on:
          - kafka-1
          - kafka-2
          - kafka-3
        networks:
          - kafka-net
    
    volumes:
      kafka1_data:
      kafka2_data:
      kafka3_data:
    
    networks:
      kafka-net:
        driver: bridge
    

    KafkaConfig

    Producer 와 Consumer 설정

    producer

    • ProducerFactory<String, String>

      • Producer가 사용할 설정 정보를 선언

      • key와 value를 모두 문자열로 직렬화하기 위해 StringSerializer를 사용 (Apache Kafka)

    • KafkaTemplate<String, String>

      • Producer 대신 실제로 send()를 호출하는 역할

      • Kafka와 Spring boot가 통신하는 객체

    Consumer

    • ConsumerFactory<String, String>

      • Consumer가 어떤 방식으로 메시지를 읽을지 설정

      • key와 value를 문자열로 역직렬화하기 위해 StringDeserializer를 사용

      • GROUP_ID_CONFIG = "simple-group"

        → 이 Consumer가 속한 그룹 이름

      • AUTO_OFFSET_RESET_CONFIG = "earliest"

      → 처음 실행 시, 토픽에 쌓여 있던 예전 메시지까지 모두 읽도록

      • AUTO_OFFSET_RESET_CONFIG = "latest"

      → 처음 실행 시, 토픽에 최신 메세지부터 읽도록

    • ConcurrentKafkaListenerContainerFactory<String, String>

      • @KafkaListener가 내부적으로 사용할 Container를 생성해 주는 Factory

      • containerFactory 이름으로 @KafkaListener에 연결해서 사용

    import java.util.HashMap;
    import java.util.Map;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;
    
    @EnableKafka
    @Configuration
    public class KafkaConfig {
    
        @Value("${spring.kafka.bootstrap-servers}")
        private String bootstrapServers;
    
        // Producer 설정 (String, String)
        @Bean
        public ProducerFactory<String, String> stringProducerFactory() {
            Map<String, Object> props = new HashMap<>();
    
            // Kafka 서버 위치
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    
            // key, value를 문자열로 직렬화
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    
    		// 균등 분배 (Round Robin)
            // 설정하지 않을 경우 1개의 Partition 에만 key가 없는 메세지를 batch 가 찰 때 까지 보내는 것이 Default (Sticky Partitioner)
            // 키가 있으면 Sticky Partitioner 는 작동하지 않음
            props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class.getName());
    
            return new DefaultKafkaProducerFactory<>(props);
        }
    
        // Producer가 사용할 KafkaTemplate
        @Bean
        public KafkaTemplate<String, String> stringKafkaTemplate() {
            return new KafkaTemplate<>(stringProducerFactory());
        }
    
        // Consumer 설정 (String, String)
        @Bean
        public ConsumerFactory<String, String> stringConsumerFactory() {
            Map<String, Object> props = new HashMap<>();
    
            // Kafka 서버 위치
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    
            // 이 Consumer가 속할 그룹 아이디
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "simple-group");
    
            // key, value를 문자열로 역직렬화
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    
            // 새로운 Consumer 로 처음 시작할 때, 토픽 맨 앞부터 읽도록 설정 - earliest
            // 새로운 Consumer 로 처음 시작할 때, 토픽 맨 뒤부터 (최신 메세지부터) 읽도록 설정 - latest
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    
            return new DefaultKafkaConsumerFactory<>(props);
        }
    
        // @KafkaListener가 사용할 ListenerContainerFactory
        // ConsumerFactory를 만들어 주고 이를 감지하는 Listener
        // 값이 들어온 것을 인지
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, String> stringKafkaListenerContainerFactory() {
    
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();      
            // 만든 consumer 등록
            factory.setConsumerFactory(stringConsumerFactory());
            
            return factory;
        }
        
        // String, custom Object 형태 처리
        // Boot 4 버전 부터는 JsonDeserializer 가 Deprecated, 직접 서비스단에서 우리가 변환해줘야 함
    
        // 직렬화 (Producer)
        // Object -> objectMapper.writeValueAsString() -> String -> Kafka
    
        // 역직렬화 (Consumer)
        // Kafka -> String -> objectMapper.readValue() -> Object
        
        // 직접 명시하여 형변환 하는 것이 차이
    }
    

    Boot 3.x 의 경우의 커스텀 객체 처리 포함 설정부

      // Boot 3.x 일 경우
      // Producer 설정 (String, SimpleEvent) - JSON 직렬화
      @Bean
      public ProducerFactory<String, SimpleEvent> eventProducerFactory() {
          Map<String, Object> props = new HashMap<>();
    
          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
          props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
          props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    
          return new DefaultKafkaProducerFactory<>(props);
      }
    
      @Bean
      public KafkaTemplate<String, SimpleEvent> eventKafkaTemplate() {
          return new KafkaTemplate<>(eventProducerFactory());
      }
    
      // Consumer 설정 (String, SimpleEvent) - JSON 역직렬화
      @Bean
      public ConsumerFactory<String, SimpleEvent> eventConsumerFactory() {
          Map<String, Object> props = new HashMap<>();
    
          props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
          props.put(ConsumerConfig.GROUP_ID_CONFIG, "simple-event-group");
          props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
          props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
          props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    
          // SimpleEvent 타입을 역직렬화하기 위한 JsonDeserializer 생성
          JsonDeserializer<SimpleEvent> deserializer = new JsonDeserializer<>(SimpleEvent.class);
    
          // 역직렬화를 허용할 패키지 지정 (보안 및 타입 안전성)
          deserializer.addTrustedPackages("com.example.kafka.event");
    
          return new DefaultKafkaConsumerFactory<>(
                  props,
                  new StringDeserializer(),
                  deserializer
          );
      }
    
      @Bean
      public ConcurrentKafkaListenerContainerFactory<String, SimpleEvent> eventKafkaListenerContainerFactory() {
    
          ConcurrentKafkaListenerContainerFactory<String, SimpleEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
    
          factory.setConsumerFactory(eventConsumerFactory());
          return factory;
      }
     
    

    Listener, Producer, Controller 클래스

    단순 String 전달의 경우

    • Producer

    @Service
    @RequiredArgsConstructor
    public class SimpleMessageProducer {
        // 상수로 Topic 이름을 관리하는 것이 좋음
        private static final String TOPIC_SIMPLE = "simple-messages";
    
        // kafka Config에서 설정한 KafkaTemplate<String, String> 설정 불러오기
        private final KafkaTemplate<String, String> stringKafkaTemplate;
    
        // 단순히 지정된 Topic 으로 메세지를 제공
        public void send(String message) {
            stringKafkaTemplate.send(TOPIC_SIMPLE, message);
        }
    }
    
    • Listener

    @Slf4j
    @Component
    public class SimpleMessageListener {
        // 메세지를 보내기 위해서는 어디서 어디로 보내줄 것인지 지정해야함
        @KafkaListener(
                topics = "simple-messages", // simple-messages Topic 구독
                groupId = "simple-group", // Config에서 사용한 그룹과 일치시켜야함
                containerFactory = "stringKafkaListenerContainerFactory" // Config 에서 만든 설정을 이용
        )
    
        public void consume(String message) {
            log.info("받은 메시지: {}", message);
        }
    
        // 위 코드대로만 하면 simple-group 이라는 Consumer Group 에 1개의 Consumer 가 붙은 구조
        // 1개의 Consumer 가 3개의 Partition 관리
    
        // 3개 까지 Consumer 를 늘려 Partition 수 대로 지정
        @KafkaListener(
                topics = "simple-messages", // simple-messages Topic 구독
                groupId = "simple-group", // Config에서 사용한 그룹과 일치시켜야함
                containerFactory = "stringKafkaListenerContainerFactory" // Config 에서 만든 설정을 이용
        )
    
        public void consume_2(String message) {
            log.info("받은 메시지: {}", message);
        }
    
        @KafkaListener(
                topics = "simple-messages", // simple-messages Topic 구독
                groupId = "simple-group", // Config에서 사용한 그룹과 일치시켜야함
                containerFactory = "stringKafkaListenerContainerFactory" // Config 에서 만든 설정을 이용
        )
    
        public void consume_3(String message) {
            log.info("받은 메시지: {}", message);
        }
    }
    

    DTO 를 통한 Json

    • Producer

    @Service
    @RequiredArgsConstructor
    public class SimpleEventProducer {
        private final ObjectMapper objectMapper;
    
        private static final String TOPIC_EVENT = "simple-event";
    
        private final KafkaTemplate<String, String> stringKafkaTemplate;
    
        // Boot 4 부터는 직접 형변환 명시
        public void send(SimpleEvent simpleEvent) {
            stringKafkaTemplate.send(TOPIC_EVENT, objectMapper.writeValueAsString(simpleEvent));
        }
    }
    
    • Boot 3.x Producer

    @Service
    @RequiredArgsConstructor
    public class SimpleEventProducer {
    
        private static final String TOPIC = "simple-event";
    
        private final KafkaTemplate<String, SimpleEvent> eventKafkaTemplate;
    
        public void send(SimpleEvent event) {
            eventKafkaTemplate.send(TOPIC, event);
        }
    }
    
    • Listener

    @Slf4j
    @Component
    @RequiredArgsConstructor
    public class SimpleEventListener {
        private final ObjectMapper objectMapper;
    
        // 메세지를 보내기 위해서는 어디서 어디로 보내줄 것인지 지정해야함
        @KafkaListener(
                topics = "simple-event", // simple-messages Topic 구독
                groupId = "simple-group", // Config에서 사용한 그룹과 일치시켜야함
                containerFactory = "stringKafkaListenerContainerFactory" // Config 에서 만든 설정을 이용
        )
    
        // Boot 4 버전 부터는 직접 형변환
        public void consume(String message) {
            SimpleEvent event = objectMapper.readValue(message, SimpleEvent.class);
            log.info("받은 이벤트: {}", event.toString());
        }
    }
    
    • Boot 3.x Listener

    @Slf4j
    @Component
    public class SimpleEventListener {
    
        @KafkaListener(
            topics = "simple-event",
            groupId = "simple-event-group",
            containerFactory = "eventKafkaListenerContainerFactory"
        )
        public void consume(SimpleEvent event) {
            log.info("받은 이벤트: {}", event);
        }
    }
    
    • DTO

    @Getter
    @NoArgsConstructor
    @AllArgsConstructor
    @ToString
    public class SimpleEvent {
        // 실제 메시지 내용
        private String message;
    
        // 작업자
        private String worker;
    
        // 이벤트 생성 시각
        private LocalDateTime createdAt;
    
        public SimpleEvent(String message) {
            this.message = message;
            this.worker = "tester";
            this.createdAt = LocalDateTime.now();
        }
    }
    
    • Controller

    @RestController
    @RequestMapping("/api/simple")
    @RequiredArgsConstructor
    public class SimpleController {
        private final SimpleMessageProducer simpleMessageProducer;
        private final SimpleEventProducer simpleEventProducer;
    
        @PostMapping("/messages")
        public ResponseEntity<Void> sendSimpleMessage(@RequestBody SimpleSendRequest request) {
            simpleMessageProducer.send(request.message());
            return ResponseEntity.status(HttpStatus.CREATED).build();
        }
    
        @PostMapping("/events")
        public ResponseEntity<Void> sendSimpleEvent(@RequestBody SimpleSendRequest request) {
            simpleEventProducer.send(new SimpleEvent(request.message()));
            return ResponseEntity.status(HttpStatus.CREATED).build();
        }
    

    결과

    • Boot 4 버전으로 실행 결과

    • DTO 를 통한 JSON 의 경우

    Share article
    Contents
    Spring Boot + KafkaGradleapplication.ymldocker-compose.ymlKafkaConfigProducer 와 Consumer 설정producerConsumerBoot 3.x 의 경우의 커스텀 객체 처리 포함 설정부Listener, Producer, Controller 클래스단순 String 전달의 경우DTO 를 통한 Json결과

    LifeLog, DevLog - https://github.com/KYJTHEYJ

    RSS·Powered by Inblog