Spring Kafka 설정 정리 본문
Config
Producer
@Configuration
class KafkaProducerConfig(
@Value("\${-}")
private val bootstrapServers: String,
@Value("\${-}")
private val topicA: String,
@Value("\${-}")
private val topicB: String,
@Value("\${-}")
private val topicC: String,
private val environment: Environment
) {
// Kafka 메세지를 생성하는 Producer 인스턴스를 생성합니다.
// ProducerFactory<String, Any> : produce 메세지의 key를 string, value를 Any(Json 직렬화 한 것)로 잡습니다.
@Bean
fun producerFactory(): ProducerFactory<String, Any> {
return DefaultKafkaProducerFactory(producerConfigs())
}
@Bean
fun producerConfigs(): Map<String, Any> {
val props = HashMap<String, Any>()
// 연결한 카프카 클러스터의 broker 주소를 세팅합니다.
props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
// 카프카 메세지의 "key" 시리얼라이저
props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
// 카프카 메세지의 "Value" 시리얼라이저
props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java
return props
}
@Bean
fun kafkaTemplate(): KafkaTemplate<String, Any> {
return KafkaTemplate(producerFactory())
}
// 해당 kafka 관리자를 생성합니다.
// 여기서는 Topic을 관리하는 역할을 수행합니다.
@Bean
fun kafkaAdmin(): KafkaAdmin {
val configs = mapOf<String, Any>(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers
)
return KafkaAdmin(configs)
}
// 환결별로 토픽의 파티션 수와 레플리케이션 수를 다르게 설정합니다.
// 파티션은 consumer의 group-id에 따라 묶일 수 있기 때문에 잘 관리 해야 합니다.
private fun getTopicByEnvironment(topicName: String): NewTopic {
return if(SpringProfiles.isActive(environment, SpringProfiles.KR_PROD)) {
TopicBuilder.name(topicName)
.partitions(1)
.replicas(2) // PR 에서만 replication 2 설정
.build()
}else {
TopicBuilder.name(topicName)
.partitions(1)
.replicas(1)
.build()
}
}
// Springboot 실행시 Topic을 등록합니다.
@Bean
fun kafkaTopics(): NewTopics {
val topics = listOf(
topicA,
topicB,
topicC,
).map { getTopicByEnvironment(it) }
return NewTopics(
*topics.toTypedArray()
)
}
}
집고 넘어갈 부분
파티션
- Kafka에서 하나의 토픽(topic)은 여러 개의 파티션으로 분할될 수 있다.
- 파티션은 메시지가 저장되는 물리적인 단위이자, 처리 병렬성의 기본 단위
topic: user-events
├── partition-0: [msg1, msg2, msg3, ...]
├── partition-1: [msg4, msg5, msg6, ...]
├── partition-2: [msg7, msg8, msg9, ...]
- 메시지는 하나의 파티션에만 저장됨
- 동일한 파티션 안에서는 메시지 순서(order)가 보장됨
- 파티션 수를 늘리면 더 많은 consumer가 동시에 처리 가능 (scale-out)
- 메시지는 key 해시 기반 또는 round-robin으로 파티션에 배정
핵심 포인트
- 병렬 처리, 확장을 위해 반드시 고려되어야 함
- 파티션 수보다 컨슈머 수가 많으면 일부 컨슈머는 일 안 함
레플리케이션
- 고가용성을 위해 각 파티션을 여러 브로커에 복제한다.
- 하나의 브로커가 down 되더라도, 다른 브로커에 복제된 파티션이 있기에 안정성을 향상시킨다.
- 하나는 leader 포지션, 다른 파티션은 follower 포지션
- leader가 죽으면(하나의 브로커가 죽으면), 리더 선출을 통해 새로운 리더를 선정하고 계속 일을 수행한다.
- Leader만 read/write를 수행 → follower가 리더의 정보를 복제해서 가지고 있음
Consumer
Config
@ConditionalOnProperty(
name = ["kafka.enabled"],
havingValue = "true", // application.yml에서 kafka.enabled=true일 때만 이 설정이 적용됨
matchIfMissing = false // kafka.enabled가 없으면 Kafka를 비활성화 // Test 환경 활용으로 추가
)
@Configuration
@EnableKafka // 카프카 활성화 -> Listener 동작
class KafkaConsumerConfig(
@Value("\${-}")
private val bootstrapServers: String,
) {
@Bean
fun kafkaListenerContainerFactory(): KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Any>> {
// Kafka 메시지를 병렬로 처리할 수 있는 기본 팩토리
val factory = ConcurrentKafkaListenerContainerFactory<String, Any>()
factory.consumerFactory = consumerFactory()
// 에러 핸들러 설정: 즉시 재시도 없이 최대 3번 재시도 (FixedBackOff(delayMs=0, maxAttempts=3))
val errorHandler = DefaultErrorHandler(FixedBackOff(0L, 3L))
// IllegalStateException, SerializationException은 재시도하지 않음
errorHandler.addNotRetryableExceptions(IllegalStateException::class.java)
errorHandler.addNotRetryableExceptions(SerializationException::class.java)
factory.setCommonErrorHandler(errorHandler)
return factory
}
@Bean
fun consumerFactory(): ConsumerFactory<String, Any> {
return DefaultKafkaConsumerFactory(consumerConfig())
}
@Bean
fun consumerConfig(): Map<String, Any> {
val props: MutableMap<String, Any> = HashMap()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
// Key 역직렬화 세팅. ErrorHandler로 감싸서 사용
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
props[ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS] = StringDeserializer::class.java
// value 역직렬화 세팅. ErrorHandler로 감싸서 사용
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
props[ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS] = JsonDeserializer::class.java
// 역직렬화 허용 패키지 (메세지 구조 정의가 들어있는 패키지. *은 전체 허용)
props[JsonDeserializer.TRUSTED_PACKAGES] = "*"
// Kafka 메시지 헤더(__TypeId__)에 명시된 클래스 타입 정보를 기반으로 역직렬화 수행
// com.midasit.motive.infra.component.dto.kafkaMessage.ViewMilestoneMessage
// 위와 같은 형식으로 헤더에 들어갑니다. -> 다른 서버에서 오는 메세지는 이것만으로 처리 불가능.
props[JsonDeserializer.USE_TYPE_INFO_HEADERS] = true
return props
}
}
Consumer
@Component
@Profile("!local")
class KafkaMessageConsumer(
) {
@KafkaListener(
topics = ["\${-}"],
groupId = "\${kafka.group-id}"
)
fun handleLikeMilestoneMessage(message: MessageA) {
}
@KafkaListener(
topics = ["\${-}"],
groupId = "\${kafka.group-id}"
)
fun handleViewMilestoneMessage(message: MessageB) {
}
@KafkaListener(
topics = ["\${-}"],
groupId = "\${kafka.group-id}"
)
fun handleDraftNotificationMessage(message: MessageC) {
}
@KafkaListener(
topics = ["\${-}"],
groupId = "\${kafka.group-id}"
)
fun handleMemberProfileUpdateMessage(message: MessageD) {
}
}
집고 넘어갈 부분
Topic
- Topic: sub 할 토픽을 지정합니다.
GroupId
분산 서버 환경에서 메세지의 중복 처리를 방지하기 위해 “필수” 입니다.
같은 groupId를 가진 컨슈머들은 한개의 컨슈머 그룹 을 형성합니다.
1. 파티션은 컨슈머 그룹 내에서 분배된다
- 하나의 토픽이 여러 파티션으로 구성되어 있고
- 하나의 consumer group 안에 여러 컨슈머가 있으면
- Kafka는 각 파티션을 그룹 내 컨슈머에게 나눠줌
📌 즉, group-id는 같은 메시지를 공유하지 않도록 묶는 단위
topic: user-events (partition-0, partition-1, partition-2)
group-id: event-consumer-group
Consumer A → partition-0, partition-2
Consumer B → partition-1
같은 group-id로 묶은 서버A의 Consumer와 서버B의 Consumer가 처리하는 메세지가 다릅니다.
AWS로 Scale-out 되는 경우, group-id를 동일하게 맞춰주어야 서로 다른 서버에서 같은 메세지를 처리하지 않게 됩니다.
2. group-id가 다르면 → 서로 다른 그룹으로 메시지 따로 소비 (브로드캐스트)
Consumer A (group-id = group-1)
Consumer B (group-id = group-2)
→ 같은 메시지를 각각 받음
→ 완전히 독립된 consumer 그룹으로 간주됨
- 그룹 아이디가 같아도, topic이 다르면, 상관 ㄴㄴ 입니다.
3. 파티션 수 < 컨슈머 수 → 일부 컨슈머는 놀게 됨
- 파티션 2개, 컨슈머 3개 → 1명은 파티션 할당 못 받고 대기만 함
4. 파티션 수 > 컨슈머 수 → 일부 컨슈머가 여러 파티션 처리
- 파티션 4개, 컨슈머 2개 → 각 컨슈머가 2개씩 파티션 처리
만약, DV 환경과 Local 개발 환경에서 groupId를 동일하게 가져가고 있다면,
DV 쪽 리스너에서 메세지가 처리 되는 경우도 있습니다.
local 에서는 group-id를 다르게 가져가야 해요!
정리
- 병렬 소비하고 싶다면 → 파티션 수 ≥ 컨슈머 수로 설정
- group-id는 같은 서비스/로직끼리만 공유하는 게 좋음
- KafkaListener를 scale-out할 때 group-id만 같게 유지하면 파티션이 자동 분배됨
Message Deserialize 방법
Kafka 에서는 pub/sub 구조이기 때문에 서로 메세지 형식을 명확히 정의하고, 잘 다루어야 합니다.
특히 consumer는 해당 토픽에 어떤 형태의 메세지가 들어올지 알고, 알잘딱깔센 하게 메세지를 역직렬화 해야 합니다.
Confluent Kafka 에서는 schema registry 를 지원하여 양쪽에서 동일한 메세지를 사용하고 있는지 검증할 수 있지만, 우리는 알잘딱깔센 하게 잘 해주어야 합니다.
그럼 어떤 방식으로 consumer에서 메세지를 역직렬화 할 수 있을까요?
1. 메세지 헤더 타입을 통한 결정
producer config
props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java
props[JsonSerializer.ADD_TYPE_INFO_HEADERS] = true // 기본값이 true
consumer config
props[JsonDeserializer.USE_TYPE_INFO_HEADERS] = true
producer에서 메세지를 보낼때, 메세지의 class 이름을 header에 넣어 보내는 방식입니다.
ex) com.midasit.motive.infra.component.dto.kafkaMessage.ViewMilestoneMessage
consumer에서 메세지 header의 클래스 명을 보고, 해당 클래스로 매핑을 시도 합니다.
하지만, pub 하는 곳의 패키지와 sub 하는 곳의 패키지가 다르다면, 사용할 수 없습니다.
가장 쉬운 방법이기에, 공통 라이브러리 (nexus 등)을 사용중이라면, 선택할 만 합니다!
이렇게 하면, 양쪽 모두 동일한 클래스 이름을 사용하게 되어 디시리얼라이즈 할 때, 클래스를 참조할 수 있습니다.
2. 메세지 헤더 타입(개선)
producer 측의 메세지 이름 전체를 넣는 대신, 단순한 id를 직접 정의하여 헤더에 넣을 수 있습니다.
header ID로 likeMilestoneMessage 이렇게만 넣는 방식이죠.
producer config
@Bean
fun producerConfigs(): Map<String, Any> {
val props = HashMap<String, Any>()
props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java
// 1. 헤더에 타입 인포를 넣을게!
props[JsonSerializer.ADD_TYPE_INFO_HEADERS] = true
// 2. 근데, 자동으로 하지 말고, 타입 매핑을 지정해 줄게!
props[JsonSerializer.ADD_TYPE_INFO_HEADERS]
// 3. 아래 형식에 맞춰 넣을게
props[JsonSerializer.TYPE_MAPPINGS] = mapOf(
"messageA" to "com.~~.MessageA",
"messageB" to "com.~~.MessageB",
"messageC" to "com.~~.MessageC"
)
return props;
}
혹은, JsonSerializer를 수정하는 방법도 있습니다.
@Bean
fun producerFactory(): ProducerFactory<String, Any> {
val props = producerConfigs()
val jsonSerializer = JsonSerializer<Any>()
val typeMapper = DefaultJackson2JavaTypeMapper().apply {
typePrecedence = (TypePrecedence.TYPE_ID)
idClassMapping = mapOf(
"messageA" to MessageA::class.java,
"messageB" to MessageB::class.java,
"messageC" to MessageC::class.java
)
}
jsonSerializer.typeMapper = typeMapper
jsonSerializer.isAddTypeInfo = true
return DefaultKafkaProducerFactory(props, StringSerializer(), jsonSerializer )
}
consumer config
@Bean
fun consumerFactory(): ConsumerFactory<String, Any> {
// 1) 공통 consumer 프로퍼티
val props = mutableMapOf<String, Any>(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ErrorHandlingDeserializer::class.java,
ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS to JsonDeserializer::class.java
)
// 2) 헤더 ID → 로컬 클래스 매핑용 TypeMapper
val typeMapper = DefaultJackson2JavaTypeMapper().apply {
typePrecedence = Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID
addTrustedPackages("com.midasit.motive.infra.component.dto.kafkaMessage")
idClassMapping = mapOf(
"MessageA" to com.~~.MessageA::class.java,
"MessageB" to com.~~.MessageB::class.java,
"MessageC" to com.~~.MessageC::class.java
)
}
// 3) JsonDeserializer에 TypeMapper 주입, 헤더 유지
val jsonDeserializer = JsonDeserializer<Any>().apply {
setTypeMapper(typeMapper)
setRemoveTypeHeaders(false)
}
// 4) 최종 ConsumerFactory 반환
return DefaultKafkaConsumerFactory(
props,
StringDeserializer(),
jsonDeserializer
)
}
이렇게 producer와 consumer가 약속한 ID 를 통해 매핑을 하는 방법이 있습니다.
3. jacksonTypeMapper 사용
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(
JsonSubTypes.Type(value = MessageA::class, name = "MessageA"),
JsonSubTypes.Type(value = MessageB::class, name = "MessageB"),
JsonSubTypes.Type(value = MessageC::class, name = "MessageC"),
)
interface BaseMessage {
}
class MessageA (
val baseContentId: Long,
val receiverUserSn: Long
): BaseMessage
메세지에 Type name을 명시합니다.
Producer에서 직렬화를 할 때, 메세지 본문에 "type": "MessageA" 가 추가 됩니다.
Consumer 에서 “type“ 필드를 보고 어떤 클래스로 매핑할지 결정합니다.
결정을 위해서는 똑같은 형식으로 작성된 메세지가 있는 곳을 명시해줘야 합니다.
props[JsonDeserializer.USE_TYPE_INFO_HEADERS] = false
// 메세지 벨류가 들어오면, VALUE_DEFAULT_TYPE 으로 일단 바인딩
props[JsonDeserializer.VALUE_DEFAULT_TYPE] = "com.~~.BaseMessage"
props[JsonDeserializer.TRUSTED_PACKAGES] = "com.~~"
이렇게 하면, Kafka에서 받아온 메세지에 "type": "MessageA" 가 있을때,
- VALUE_DEFAULT_TYPE 로 지정한 BaseMessage에 일단 바인딩
- Jackson 이 @JsonTypeInfo(property="type") 를 보고
" MessageA " 에 매핑된 MessageA 인스턴스로 변환해 줍니다.
즉, 쟉-슨 매퍼를 통해 역직렬화를 수행하고 싶다면, 내가 받을 모든 메세지가 VALUE_DEFAULT_TYPE을 상속해야 한다는 이야기 입니다.
참 귀찮아지죠잉?
4. RecordMessageConverter 사용 (jackson 기반)
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(
JsonSubTypes.Type(value = MessageA::class, name = "MessageA"),
JsonSubTypes.Type(value = MessageB::class, name = "MessageB"),
JsonSubTypes.Type(value = MessageC::class, name = "MessageC"),
)
interface BaseMessage {
}
class MessageA (
val baseContentId: Long,
val receiverUserSn: Long
): BaseMessage
Producer/Consumer 에서 사용할 message에 타입을 정의 합니다.
“name“에 정의한 이름이 message body에 “type” 컬럼으로 들어갑니다.
Consumer에서 name 을 기반으로 @Payload 에서 적절한 타입으로 바꾸도록 합니다.
@ConditionalOnProperty(
name = ["kafka.enabled"],
havingValue = "true",
matchIfMissing = false
)
@Configuration
@EnableKafka
class KafkaConsumerConfig(
@Value("\${kafka.bootstrap-servers}")
private val bootstrapServers: String,
) {
@Bean
fun kafkaListenerContainerFactory(): KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Any>> {
val factory = ConcurrentKafkaListenerContainerFactory<String, Any>()
factory.consumerFactory = consumerFactory()
val errorHandler = DefaultErrorHandler(FixedBackOff(0L, 3L))
errorHandler.addNotRetryableExceptions(IllegalStateException::class.java)
errorHandler.addNotRetryableExceptions(SerializationException::class.java)
factory.setCommonErrorHandler(errorHandler)
// customConfig에서 Value를 String으로 가져왔고, 그걸 Json으로 치고 convert
val converter = StringJsonMessageConverter()
val typeMapper = DefaultJackson2JavaTypeMapper().apply {
typePrecedence = Jackson2JavaTypeMapper.TypePrecedence.INFERRED
classIdFieldName = "type" // 타입이 명시된 컬럼 이름
addTrustedPackages("*")
}
converter.typeMapper = typeMapper
factory.setRecordMessageConverter(converter)
return factory
}
@Bean
fun consumerFactory(): ConsumerFactory<String, Any> {
return DefaultKafkaConsumerFactory(consumerConfig())
}
@Bean
fun consumerConfig(): Map<String, Any> {
val props: MutableMap<String, Any> = HashMap()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
props[ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
// StringJsonMessageConverter를 사용하기 위해서 String으로 먼저 deserialize 합니다.
props[ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS] = StringDeserializer::class.java
props[JsonDeserializer.TRUSTED_PACKAGES] = "*"
// headeinfo를 사용하지 않고, RecordMessageConverter를 통해 타입을 추론합니다.
props[JsonDeserializer.USE_TYPE_INFO_HEADERS] = false
return props
}
}
@KafkaListener(
topics = ["test-topic"],
groupId = "\${kafka.group-id}"
)
fun handleTestMessage(
@Payload message: Test3Message
) {
// 테스트용 메시지 처리 로직
println("Received test message: ${message.message}")
}
RecordMessageConverter interface의 구현채인 StringJasonMessageConverter를 사용합니다.
body의 “type” 을 보고 @Payload message: Test3Message으로 자동 역직렬화를 수행합니다.
'Backend' 카테고리의 다른 글
Spring WebFlux와 비동기 처리 이해 (0) | 2025.04.30 |
---|---|
[MySql] 인덱싱 개념 정리 (0) | 2025.04.29 |
Kafka.00 카프카 개요 (0) | 2025.03.06 |
Lambda@Edge를 사용한 이미지 리사이징 적용기 (0) | 2025.02.25 |
Intellij 에서 Springboot 시작하기 (0) | 2023.01.15 |