본문 바로가기

Spring Kafka 설정 정리 본문

Backend

Spring Kafka 설정 정리

00rigin 2025. 6. 12. 20:19

Config

Producer

@Configuration
class KafkaProducerConfig(
    @Value("\${-}")
    private val bootstrapServers: String,
    @Value("\${-}")
    private val topicA: String,
    @Value("\${-}")
    private val topicB: String,
    @Value("\${-}")
    private val topicC: String,
    private val environment: Environment
) {

  // Kafka 메세지를 생성하는 Producer 인스턴스를 생성합니다.
  // ProducerFactory<String, Any> : produce 메세지의 key를 string, value를 Any(Json 직렬화 한 것)로 잡습니다.
    @Bean
    fun producerFactory(): ProducerFactory<String, Any> {
        return DefaultKafkaProducerFactory(producerConfigs())
    }

    @Bean
    fun producerConfigs(): Map<String, Any> {
        val props = HashMap<String, Any>()
        // 연결한 카프카 클러스터의 broker 주소를 세팅합니다.
        props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
      // 카프카 메세지의 "key" 시리얼라이저
        props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
      // 카프카 메세지의 "Value" 시리얼라이저
        props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java
        return props
    }

    @Bean
    fun kafkaTemplate(): KafkaTemplate<String, Any> {
        return KafkaTemplate(producerFactory())
    }

    // 해당 kafka 관리자를 생성합니다.
    // 여기서는 Topic을 관리하는 역할을 수행합니다.
    @Bean
    fun kafkaAdmin(): KafkaAdmin {
        val configs = mapOf<String, Any>(
            AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers
        )
        return KafkaAdmin(configs)
    }

    // 환결별로 토픽의 파티션 수와 레플리케이션 수를 다르게 설정합니다.
    // 파티션은 consumer의 group-id에 따라 묶일 수 있기 때문에 잘 관리 해야 합니다.
    private fun getTopicByEnvironment(topicName: String): NewTopic {
        return if(SpringProfiles.isActive(environment, SpringProfiles.KR_PROD)) {
            TopicBuilder.name(topicName)
                .partitions(1)
                .replicas(2) // PR 에서만 replication 2 설정
                .build()

        }else {
            TopicBuilder.name(topicName)
                .partitions(1)
                .replicas(1)
                .build()
        }

    }

    // Springboot 실행시 Topic을 등록합니다.
    @Bean
    fun kafkaTopics(): NewTopics {
        val topics = listOf(
            topicA,
            topicB,
            topicC,
        ).map { getTopicByEnvironment(it) }
        return NewTopics(
            *topics.toTypedArray()
        )
    }
}

집고 넘어갈 부분

파티션

  • Kafka에서 하나의 토픽(topic)은 여러 개의 파티션으로 분할될 수 있다.
  • 파티션은 메시지가 저장되는 물리적인 단위이자, 처리 병렬성의 기본 단위
topic: user-events
  ├── partition-0: [msg1, msg2, msg3, ...]
  ├── partition-1: [msg4, msg5, msg6, ...]
  ├── partition-2: [msg7, msg8, msg9, ...]
 
  • 메시지는 하나의 파티션에만 저장됨
  • 동일한 파티션 안에서는 메시지 순서(order)가 보장됨
  • 파티션 수를 늘리면 더 많은 consumer가 동시에 처리 가능 (scale-out)
  • 메시지는 key 해시 기반 또는 round-robin으로 파티션에 배정

핵심 포인트

  • 병렬 처리, 확장을 위해 반드시 고려되어야 함
  • 파티션 수보다 컨슈머 수가 많으면 일부 컨슈머는 일 안 함

레플리케이션

  • 고가용성을 위해 각 파티션을 여러 브로커에 복제한다.
    • 하나의 브로커가 down 되더라도, 다른 브로커에 복제된 파티션이 있기에 안정성을 향상시킨다.
    • 하나는 leader 포지션, 다른 파티션은 follower 포지션
    • leader가 죽으면(하나의 브로커가 죽으면), 리더 선출을 통해 새로운 리더를 선정하고 계속 일을 수행한다.
  • Leader만 read/write를 수행 → follower가 리더의 정보를 복제해서 가지고 있음

 

Consumer

Config

 
@ConditionalOnProperty(
    name = ["kafka.enabled"],
    havingValue = "true", // application.yml에서 kafka.enabled=true일 때만 이 설정이 적용됨
    matchIfMissing = false // kafka.enabled가 없으면 Kafka를 비활성화 // Test 환경 활용으로 추가
)
@Configuration
@EnableKafka // 카프카 활성화 -> Listener 동작
class KafkaConsumerConfig(
    @Value("\${-}")
    private val bootstrapServers: String,
) {

    @Bean
    fun kafkaListenerContainerFactory(): KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Any>> {
        
        // Kafka 메시지를 병렬로 처리할 수 있는 기본 팩토리
        val factory = ConcurrentKafkaListenerContainerFactory<String, Any>()
        factory.consumerFactory = consumerFactory()
        
        // 에러 핸들러 설정: 즉시 재시도 없이 최대 3번 재시도 (FixedBackOff(delayMs=0, maxAttempts=3))
        val errorHandler = DefaultErrorHandler(FixedBackOff(0L, 3L))
        
        // IllegalStateException, SerializationException은 재시도하지 않음
        errorHandler.addNotRetryableExceptions(IllegalStateException::class.java)
        errorHandler.addNotRetryableExceptions(SerializationException::class.java)
        factory.setCommonErrorHandler(errorHandler)
        return factory
    }

    @Bean
    fun consumerFactory(): ConsumerFactory<String, Any> {
        return DefaultKafkaConsumerFactory(consumerConfig())
    }

    @Bean
    fun consumerConfig(): Map<String, Any> {
        val props: MutableMap<String, Any> = HashMap()
        props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers

        // Key 역직렬화 세팅. ErrorHandler로 감싸서 사용
        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
        props[ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS] = StringDeserializer::class.java

        // value 역직렬화 세팅. ErrorHandler로 감싸서 사용
        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
        props[ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS] = JsonDeserializer::class.java

        // 역직렬화 허용 패키지 (메세지 구조 정의가 들어있는 패키지. *은 전체 허용)
        props[JsonDeserializer.TRUSTED_PACKAGES] = "*"
        
        // Kafka 메시지 헤더(__TypeId__)에 명시된 클래스 타입 정보를 기반으로 역직렬화 수행
        // com.midasit.motive.infra.component.dto.kafkaMessage.ViewMilestoneMessage 
        // 위와 같은 형식으로 헤더에 들어갑니다. -> 다른 서버에서 오는 메세지는 이것만으로 처리 불가능.
        props[JsonDeserializer.USE_TYPE_INFO_HEADERS] = true

        return props
    }

}

Consumer

 
@Component
@Profile("!local")
class KafkaMessageConsumer(
) {
    @KafkaListener(
        topics = ["\${-}"],
        groupId = "\${kafka.group-id}"
    )
    fun handleLikeMilestoneMessage(message: MessageA) {
        
    }

    @KafkaListener(
        topics = ["\${-}"],
        groupId = "\${kafka.group-id}"
    )
    fun handleViewMilestoneMessage(message: MessageB) {
        
    }

    @KafkaListener(
        topics = ["\${-}"],
        groupId = "\${kafka.group-id}"
    )
    fun handleDraftNotificationMessage(message: MessageC) {
        
    }

    @KafkaListener(
        topics = ["\${-}"],
        groupId = "\${kafka.group-id}"
    )
    fun handleMemberProfileUpdateMessage(message: MessageD) {
        
    }
}

집고 넘어갈 부분

Topic

  • Topic: sub 할 토픽을 지정합니다.

GroupId

분산 서버 환경에서 메세지의 중복 처리를 방지하기 위해 “필수” 입니다.
같은 groupId를 가진 컨슈머들은 한개의 컨슈머 그룹 을 형성합니다.

1. 파티션은 컨슈머 그룹 내에서 분배된다

  • 하나의 토픽이 여러 파티션으로 구성되어 있고
  • 하나의 consumer group 안에 여러 컨슈머가 있으면
  • Kafka는 각 파티션을 그룹 내 컨슈머에게 나눠줌

📌 즉, group-id는 같은 메시지를 공유하지 않도록 묶는 단위

topic: user-events (partition-0, partition-1, partition-2)
group-id: event-consumer-group

Consumer A → partition-0, partition-2  
Consumer B → partition-1

같은 group-id로 묶은 서버A의 Consumer와 서버B의 Consumer가 처리하는 메세지가 다릅니다.
AWS로 Scale-out 되는 경우, group-id를 동일하게 맞춰주어야 서로 다른 서버에서 같은 메세지를 처리하지 않게 됩니다.

2. group-id가 다르면 → 서로 다른 그룹으로 메시지 따로 소비 (브로드캐스트)

Consumer A (group-id = group-1)  
Consumer B (group-id = group-2)

→ 같은 메시지를 각각 받음
→ 완전히 독립된 consumer 그룹으로 간주됨

  • 그룹 아이디가 같아도, topic이 다르면, 상관 ㄴㄴ 입니다.

3. 파티션 수 < 컨슈머 수 → 일부 컨슈머는 놀게 됨

  • 파티션 2개, 컨슈머 3개 → 1명은 파티션 할당 못 받고 대기만 함

4. 파티션 수 > 컨슈머 수 → 일부 컨슈머가 여러 파티션 처리

  • 파티션 4개, 컨슈머 2개 → 각 컨슈머가 2개씩 파티션 처리
 

만약, DV 환경과 Local 개발 환경에서 groupId를 동일하게 가져가고 있다면,

DV 쪽 리스너에서 메세지가 처리 되는 경우도 있습니다.

local 에서는 group-id를 다르게 가져가야 해요!

정리

  • 병렬 소비하고 싶다면 → 파티션 수 ≥ 컨슈머 수로 설정
  • group-id는 같은 서비스/로직끼리만 공유하는 게 좋음
  • KafkaListener를 scale-out할 때 group-id만 같게 유지하면 파티션이 자동 분배됨

Message Deserialize 방법

Kafka 에서는 pub/sub 구조이기 때문에 서로 메세지 형식을 명확히 정의하고, 잘 다루어야 합니다.
특히 consumer는 해당 토픽에 어떤 형태의 메세지가 들어올지 알고, 알잘딱깔센 하게 메세지를 역직렬화 해야 합니다.
Confluent Kafka 에서는 schema registry 를 지원하여 양쪽에서 동일한 메세지를 사용하고 있는지 검증할 수 있지만, 우리는 알잘딱깔센 하게 잘 해주어야 합니다.
그럼 어떤 방식으로 consumer에서 메세지를 역직렬화 할 수 있을까요?

1. 메세지 헤더 타입을 통한 결정

producer config
 

props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java
props[JsonSerializer.ADD_TYPE_INFO_HEADERS] = true // 기본값이 true

consumer config

props[JsonDeserializer.USE_TYPE_INFO_HEADERS] = true

 
producer에서 메세지를 보낼때, 메세지의 class 이름을 header에 넣어 보내는 방식입니다.
 
ex) com.midasit.motive.infra.component.dto.kafkaMessage.ViewMilestoneMessage
 
consumer에서 메세지 header의 클래스 명을 보고, 해당 클래스로 매핑을 시도 합니다.
하지만, pub 하는 곳의 패키지와 sub 하는 곳의 패키지가 다르다면, 사용할 수 없습니다.
가장 쉬운 방법이기에, 공통 라이브러리 (nexus 등)을 사용중이라면, 선택할 만 합니다!
이렇게 하면, 양쪽 모두 동일한 클래스 이름을 사용하게 되어 디시리얼라이즈 할 때, 클래스를 참조할 수 있습니다.
 

2. 메세지 헤더 타입(개선)

producer 측의 메세지 이름 전체를 넣는 대신, 단순한 id를 직접 정의하여 헤더에 넣을 수 있습니다.
header ID로 likeMilestoneMessage 이렇게만 넣는 방식이죠.
producer config

@Bean
fun producerConfigs(): Map<String, Any> {
  val props = HashMap<String, Any>()
  props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
  props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
  props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java
  // 1. 헤더에 타입 인포를 넣을게!
  props[JsonSerializer.ADD_TYPE_INFO_HEADERS] = true
  // 2. 근데, 자동으로 하지 말고, 타입 매핑을 지정해 줄게!
  props[JsonSerializer.ADD_TYPE_INFO_HEADERS]
  // 3. 아래 형식에 맞춰 넣을게
  props[JsonSerializer.TYPE_MAPPINGS] = mapOf(
      "messageA" to "com.~~.MessageA",
      "messageB" to "com.~~.MessageB",
      "messageC" to "com.~~.MessageC"
  )

  return props;
}

혹은, JsonSerializer를 수정하는 방법도 있습니다.

@Bean
fun producerFactory(): ProducerFactory<String, Any> {
    val props = producerConfigs()
    val jsonSerializer = JsonSerializer<Any>()
    val typeMapper = DefaultJackson2JavaTypeMapper().apply {
        typePrecedence = (TypePrecedence.TYPE_ID)
        idClassMapping = mapOf(
            "messageA" to MessageA::class.java,
            "messageB" to MessageB::class.java,
            "messageC" to MessageC::class.java
        )
    }
    
    jsonSerializer.typeMapper = typeMapper
    jsonSerializer.isAddTypeInfo = true
    return DefaultKafkaProducerFactory(props, StringSerializer(), jsonSerializer )
}

consumer config

@Bean
fun consumerFactory(): ConsumerFactory<String, Any> {
    // 1) 공통 consumer 프로퍼티
    val props = mutableMapOf<String, Any>(
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ErrorHandlingDeserializer::class.java,
        ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS to JsonDeserializer::class.java
    )

    // 2) 헤더 ID → 로컬 클래스 매핑용 TypeMapper
    val typeMapper = DefaultJackson2JavaTypeMapper().apply {
        typePrecedence = Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID
        addTrustedPackages("com.midasit.motive.infra.component.dto.kafkaMessage")
        idClassMapping = mapOf(
            "MessageA"  to com.~~.MessageA::class.java,
            "MessageB" to com.~~.MessageB::class.java,
            "MessageC" to com.~~.MessageC::class.java
        )
    }

    // 3) JsonDeserializer에 TypeMapper 주입, 헤더 유지
    val jsonDeserializer = JsonDeserializer<Any>().apply {
        setTypeMapper(typeMapper)
        setRemoveTypeHeaders(false)
    }

    // 4) 최종 ConsumerFactory 반환
    return DefaultKafkaConsumerFactory(
        props,
        StringDeserializer(),
        jsonDeserializer
    )
}

이렇게 producer와 consumer가 약속한 ID 를 통해 매핑을 하는 방법이 있습니다.
 

3. jacksonTypeMapper 사용

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(
    JsonSubTypes.Type(value = MessageA::class, name = "MessageA"),
    JsonSubTypes.Type(value = MessageB::class, name = "MessageB"),
    JsonSubTypes.Type(value = MessageC::class, name = "MessageC"),
)
interface BaseMessage {
}

class MessageA (
    val baseContentId: Long,
    val receiverUserSn: Long
): BaseMessage

메세지에 Type name을 명시합니다.
Producer에서 직렬화를 할 때, 메세지 본문에 "type": "MessageA" 가 추가 됩니다.
Consumer 에서 “type“ 필드를 보고 어떤 클래스로 매핑할지 결정합니다.
결정을 위해서는 똑같은 형식으로 작성된 메세지가 있는 곳을 명시해줘야 합니다.

props[JsonDeserializer.USE_TYPE_INFO_HEADERS]   = false
// 메세지 벨류가 들어오면, VALUE_DEFAULT_TYPE 으로 일단 바인딩
props[JsonDeserializer.VALUE_DEFAULT_TYPE]     = "com.~~.BaseMessage"
props[JsonDeserializer.TRUSTED_PACKAGES]       = "com.~~"

이렇게 하면, Kafka에서 받아온 메세지에 "type": "MessageA" 가 있을때,

  1. VALUE_DEFAULT_TYPE 로 지정한 BaseMessage에 일단 바인딩
  2. Jackson 이 @JsonTypeInfo(property="type") 를 보고
    " MessageA " 에 매핑된 MessageA 인스턴스로 변환해 줍니다.

즉, 쟉-슨 매퍼를 통해 역직렬화를 수행하고 싶다면, 내가 받을 모든 메세지가 VALUE_DEFAULT_TYPE을 상속해야 한다는 이야기 입니다.
참 귀찮아지죠잉?
 

4. RecordMessageConverter 사용 (jackson 기반)

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(
    JsonSubTypes.Type(value = MessageA::class, name = "MessageA"),
    JsonSubTypes.Type(value = MessageB::class, name = "MessageB"),
    JsonSubTypes.Type(value = MessageC::class, name = "MessageC"),
)
interface BaseMessage {
}

class MessageA (
    val baseContentId: Long,
    val receiverUserSn: Long
): BaseMessage

Producer/Consumer 에서 사용할 message에 타입을 정의 합니다.
“name“에 정의한 이름이 message body에 “type” 컬럼으로 들어갑니다.
Consumer에서 name 을 기반으로 @Payload 에서 적절한 타입으로 바꾸도록 합니다.

@ConditionalOnProperty(
    name = ["kafka.enabled"],
    havingValue = "true",
    matchIfMissing = false
)
@Configuration
@EnableKafka
class KafkaConsumerConfig(
    @Value("\${kafka.bootstrap-servers}")
    private val bootstrapServers: String,
) {

    @Bean
    fun kafkaListenerContainerFactory(): KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Any>> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, Any>()
        factory.consumerFactory = consumerFactory()
        val errorHandler = DefaultErrorHandler(FixedBackOff(0L, 3L))
        errorHandler.addNotRetryableExceptions(IllegalStateException::class.java)
        errorHandler.addNotRetryableExceptions(SerializationException::class.java)
        factory.setCommonErrorHandler(errorHandler)

		// customConfig에서 Value를 String으로 가져왔고, 그걸 Json으로 치고 convert
        val converter = StringJsonMessageConverter()
        val typeMapper = DefaultJackson2JavaTypeMapper().apply {
            typePrecedence = Jackson2JavaTypeMapper.TypePrecedence.INFERRED
            classIdFieldName = "type" // 타입이 명시된 컬럼 이름
            addTrustedPackages("*")
        }
        converter.typeMapper = typeMapper
        factory.setRecordMessageConverter(converter)

        return factory
    }

    @Bean
    fun consumerFactory(): ConsumerFactory<String, Any> {
        return DefaultKafkaConsumerFactory(consumerConfig())
    }

    @Bean
    fun consumerConfig(): Map<String, Any> {
        val props: MutableMap<String, Any> = HashMap()
        props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
        props[ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS] = StringDeserializer::class.java

        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
        // StringJsonMessageConverter를 사용하기 위해서 String으로 먼저 deserialize 합니다.
        props[ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS] = StringDeserializer::class.java

        props[JsonDeserializer.TRUSTED_PACKAGES] = "*"
        // headeinfo를 사용하지 않고, RecordMessageConverter를 통해 타입을 추론합니다.
        props[JsonDeserializer.USE_TYPE_INFO_HEADERS] = false

        return props
    }

}
 
 
@KafkaListener(
        topics = ["test-topic"],
        groupId = "\${kafka.group-id}"
    )
    fun handleTestMessage(
        @Payload message: Test3Message
    ) {
        // 테스트용 메시지 처리 로직
        println("Received test message: ${message.message}")
    }​

RecordMessageConverter interface의 구현채인 StringJasonMessageConverter를 사용합니다.
body의 “type” 을 보고 @Payload message: Test3Message으로 자동 역직렬화를 수행합니다.


 

Comments