본문 바로가기
IT/DEVELOP

Kafka `java.lang.ClassCastException: class [B cannot be cast to class com.example.YourCustomMessageObject`

by 콰나 | Quanna 2025. 8. 31.

 

Kafka 애플리케이션 개발 중 흔히 마주치는 에러 중 하나는 바로 java.lang.ClassCastException입니다.

특히 class [B cannot be cast to class com.example.YourCustomMessageObject와 같은 메시지는 많은 개발자를 혼란스럽게 만듭니다.

이 글은 해당 예외의 근본 원인을 깊이 있게 분석하고, 명확한 단계별 해결책과 실제 코드 예제를 제공하여 여러분의 Kafka 개발을 돕고자 합니다.

이 에러를 완벽히 이해하고 해결하여 보다 안정적인 메시징 시스템을 구축하는 데 기여할 것입니다.

Why? - Kafka `ClassCastException` 발생 원인 심층 분석

ClassCastException은 Kafka 메시지 소비 과정에서 데이터 타입 불일치가 발생했음을 나타냅니다.

이는 생산자(Producer)와 소비자(Consumer) 간의 데이터 계약이 제대로 지켜지지 않았을 때 주로 발생합니다.

Kafka 메시지 처리의 기본 원리

Kafka는 모든 메시지를 바이트 배열(`byte[]`) 형태로 처리하고 저장합니다.

즉, 어떤 종류의 데이터를 보내든 Kafka 브로커에는 최종적으로 바이트 시퀀스로 저장됩니다.

이러한 원시적인 형태의 데이터는 직렬화(Serialization)와 역직렬화(Deserialization) 과정을 통해 원하는 객체 형태로 변환됩니다.

직렬화(Serialization)와 역직렬화(Deserialization)의 역할

메시지를 Kafka에 전송할 때, 생산자는 객체를 바이트 배열로 변환하는 직렬화 과정을 거칩니다.

반대로, 소비자는 Kafka로부터 바이트 배열을 받아 다시 객체로 변환하는 역직렬화 과정을 수행합니다.

이때 생산자가 사용한 직렬화 방식과 소비자가 기대하는 역직렬화 방식이 일치하지 않으면 ClassCastException이 발생할 수 있습니다.

`class [B`의 의미

Java에서 [Bbyte[], 즉 바이트 배열 클래스를 나타냅니다.

에러 메시지에서 class [B cannot be cast to class com.example.YourCustomMessageObject는 Kafka 소비자가 byte[] 형태의 메시지를 수신했음을 의미합니다.

하지만 소비자는 이 바이트 배열을 com.example.YourCustomMessageObject 타입의 객체로 직접 형 변환하려고 시도하여 실패한 것입니다.

`com.example.YourCustomMessageObject`의 의미

com.example.YourCustomMessageObject는 애플리케이션에서 정의한 사용자 지정 Java 객체입니다.

소비자는 Kafka로부터 받은 바이트 배열을 이 특정 객체 타입으로 역직렬화하기를 기대하고 있습니다.

에러 발생 시나리오 분석

주로 다음과 같은 시나리오에서 이 에러가 발생합니다.

  • **생산자가 기본 `ByteArraySerializer` 또는 `StringSerializer`를 사용했을 때:** 생산자가 데이터를 String 또는 byte[] 형태로 직렬화하여 Kafka에 보냈습니다.
  • **소비자가 커스텀 객체 `Deserializer`를 기대할 때:** 소비자의 설정은 YourCustomMessageObject 타입의 메시지를 받아 역직렬화하도록 되어 있습니다.
  • **소비자 설정에서 `key.deserializer` 또는 `value.deserializer`가 잘못 지정되었을 때:** 특히 value.deserializer 설정이 중요합니다. 바이트 배열을 그대로 받아서 객체로 캐스팅하려고 시도할 때 발생합니다.
  • **Spring Kafka 사용 시 `spring.kafka.properties.spring.json.value.default.type` 등 자동 설정 문제:** Spring Kafka는 편리한 자동 설정을 제공하지만, 이 설정이 실제 메시지 타입과 불일치할 때 문제가 발생할 수 있습니다.

코드 예제: 에러 재현 상황

다음은 에러를 재현할 수 있는 일반적인 상황의 코드 예제입니다.

생산자는 단순한 문자열을 보내지만, 소비자는 이를 커스텀 객체로 받으려 합니다.

Producer (문자열 메시지 전송)
package com.example.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class SimpleProducer {

    private static final String TOPIC_NAME = "my-test-topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate; // String 키, String 값 설정

    public void sendMessage(String message) {
        System.out.println("Producing message: " + message);
        kafkaTemplate.send(TOPIC_NAME, message);
    }
}
Consumer (커스텀 객체로 역직렬화 시도)
package com.example.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class CustomObjectConsumer {

    @KafkaListener(topics = "my-test-topic", groupId = "my-group")
    public void listen(ConsumerRecord<String, com.example.YourCustomMessageObject> record) {
        // 이곳에서 YourCustomMessageObject로 타입 캐스팅 오류 발생 가능성
        System.out.println("Received message: " + record.value().toString());
    }
}
Consumer Configuration (예상되는 오류 설정)
# application.yml
spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: my-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 이 부분이 Custom Object Deserializer가 아니라 StringDeserializer일 때 발생
      # 또는 기본 Deserializer가 설정되어 있지 않아 ByteArrayDeserializer가 암시적으로 사용될 때

위 코드에서 SimpleProducerStringSerializer를 사용하여 문자열을 전송합니다. 하지만 CustomObjectConsumerYourCustomMessageObject 타입으로 메시지를 받으려 합니다.

만약 소비자 설정에서 value-deserializerStringDeserializer로 되어 있거나, 혹은 아무런 설정 없이 Spring Boot의 기본값이 StringDeserializer로 동작하고 있다면, ConsumerRecord<String, String>이 전달될 것입니다.

하지만 리스너 메소드의 파라미터가 ConsumerRecord<String, com.example.YourCustomMessageObject>로 선언되어 있어 Spring Kafka는 StringYourCustomMessageObject로 변환하려 시도할 것입니다.

만약 더 근본적인 문제로 Kafka의 value.deserializer 설정 자체가 org.apache.kafka.common.serialization.ByteArrayDeserializer로 되어 있고, Spring Kafka의 타입 변환 메커니즘이 바이트 배열을 객체로 직접 캐스팅하려 한다면 ClassCastException: class [B cannot be cast to class com.example.YourCustomMessageObject가 발생합니다.

해결책: 단계별 가이드

이 에러를 해결하는 핵심은 생산자와 소비자의 직렬화/역직렬화 전략을 일치시키는 것입니다.

특히 소비자가 메시지를 올바른 타입으로 역직렬화할 수 있도록 보장해야 합니다.

1. 생산자와 소비자의 직렬화/역직렬화 전략 일치

가장 근본적인 해결책은 생산자와 소비자가 동일한 메시지 포맷과 처리 방식을 사용하는 것입니다.

옵션 1: `String` 또는 `byte[]` 메시지 사용

만약 메시지 내용이 단순한 문자열이거나 바이트 배열 그대로 처리해도 무방하다면, 가장 간단한 방법입니다.

생산자 설정 (예: `StringSerializer` 사용)

생산자는 메시지를 문자열로 직렬화하여 보냅니다.

# application.yml (Producer Configuration)
spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer # 문자열 직렬화
소비자 설정 (예: `StringDeserializer` 사용)

소비자도 메시지를 문자열로 역직렬화하도록 설정합니다.

# application.yml (Consumer Configuration)
spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: my-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 문자열 역직렬화
Consumer Listener 코드
package com.example.kafka;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class SimpleStringConsumer {

    @KafkaListener(topics = "my-test-topic", groupId = "my-group")
    public void listen(String message) { // String 타입으로 직접 수신
        System.out.println("Received message as String: " + message);
    }
    
    // 또는 ConsumerRecord를 사용하는 경우
    @KafkaListener(topics = "my-test-topic", groupId = "my-group", containerFactory = "kafkaListenerContainerFactory")
    public void listenRecord(org.apache.kafka.clients.consumer.ConsumerRecord<String, String> record) {
        System.out.println("Received message from record (String value): " + record.value());
    }
}

이 방법은 YourCustomMessageObject를 사용하려는 본래 의도와는 다를 수 있습니다.

만약 반드시 사용자 정의 객체를 사용해야 한다면 다음 옵션을 따릅니다.

옵션 2: 커스텀 객체(`YourCustomMessageObject`) 사용

메시지를 특정 Java 객체로 주고받으려면, 해당 객체를 바이트 배열로 직렬화하고 역직렬화하는 메커니즘이 필요합니다.

가장 일반적인 방법은 JSON 기반 직렬화를 사용하는 것입니다.

`YourCustomMessageObject` 클래스 정의

객체는 직렬화/역직렬화가 가능하도록 준비되어야 합니다.

여기서는 Jackson 라이브러리를 이용한 JSON 직렬화를 가정합니다.

package com.example.YourCustomMessageObject;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.Serializable;

public class YourCustomMessageObject implements Serializable { // Serializable 구현은 선택적이나 권장됨
    private static final long serialVersionUID = 1L; // 버전 관리

    private String id;
    private String name;
    private int value;

    public YourCustomMessageObject() {
        // Jackson이 역직렬화할 때 필요한 기본 생성자
    }

    public YourCustomMessageObject(String id, String name, int value) {
        this.id = id;
        this.name = name;
        this.value = value;
    }

    @JsonProperty("id") // 필드명을 명시적으로 지정
    public String getId() { return id; }
    public void setId(String id) { this.id = id; }

    @JsonProperty("name")
    public String getName() { return name; }
    public void setName(String name) { this.name = name; }

    @JsonProperty("value")
    public int getValue() { return value; }
    public void setValue(int value) { this.value = value; }

    @Override
    public String toString() {
        return "YourCustomMessageObject{" +
               "id='" + id + '\'' +
               ", name='" + name + '\'' +
               ", value=" + value +
               '}';
    }
}
커스텀 `Deserializer` 구현 (선택적이지만 유연성을 위해)

Spring Kafka의 JsonDeserializer를 사용하는 것이 일반적이지만, 직접 Deserializer를 구현하는 것도 가능합니다.

여기서는 JsonDeserializer의 동작 원리를 이해하기 위해 직접 구현하는 예를 보여줍니다.


package com.example.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.example.YourCustomMessageObject.YourCustomMessageObject;
import org.apache.kafka.common.serialization.Deserializer;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;

public class CustomMessageObjectDeserializer implements Deserializer<YourCustomMessageObject> {

    private final ObjectMapper objectMapper = new ObjectMapper(); // Jackson ObjectMapper 사용

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // 설정이 필요한 경우
    }

    @Override
    public YourCustomMessageObject deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }
        try {
            // 바이트 배열을 UTF-8 문자열로 변환 후 JSON 파싱
            String jsonString = new String(data, StandardCharsets.UTF_8);
            return objectMapper.readValue(jsonString, YourCustomMessageObject.class);
        } catch (IOException e) {
            throw new RuntimeException("Error deserializing message", e);
        }
    }

    @Override
    public void close() {
        // 리소스 해제가 필요한 경우
    }
}
생산자 설정 및 코드 (커스텀 객체 전송)

생산자는 YourCustomMessageObject를 직렬화하여 전송합니다.

주로 JsonSerializer를 사용하거나, Spring Kafka의 KafkaTemplate<String, YourCustomMessageObject>를 사용하여 자동으로 JSON 직렬화를 수행하게 합니다.

# application.yml (Producer Configuration with JsonSerializer)
spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # JSON 직렬화
      properties:
        spring.json.add.type.headers: false # 클래스 타입 헤더 추가 여부 (필요시 true)

생산자 코드:

package com.example.kafka;

import com.example.YourCustomMessageObject.YourCustomMessageObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class CustomObjectProducer {

    private static final String TOPIC_NAME = "my-test-topic";

    @Autowired
    private KafkaTemplate<String, YourCustomMessageObject> kafkaTemplate; // 커스텀 객체 타입 지정

    public void sendMessage(YourCustomMessageObject message) {
        System.out.println("Producing custom object: " + message);
        kafkaTemplate.send(TOPIC_NAME, message);
    }
}
소비자 설정 및 코드 (커스텀 객체 수신)

소비자는 생산자가 보낸 객체를 정확히 역직렬화할 수 있도록 설정합니다.

이때 `CustomMessageObjectDeserializer`를 사용하거나, Spring Kafka의 `JsonDeserializer`를 사용합니다.

Custom Deserializer 사용 시 `application.yml`
# application.yml (Consumer Configuration with Custom Deserializer)
spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: my-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: com.example.kafka.CustomMessageObjectDeserializer # 직접 구현한 Deserializer 지정
Consumer Listener 코드
package com.example.kafka;

import com.example.YourCustomMessageObject.YourCustomMessageObject;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class CorrectCustomObjectConsumer {

    @KafkaListener(topics = "my-test-topic", groupId = "my-group")
    public void listen(YourCustomMessageObject message) { // 올바른 타입으로 직접 수신
        System.out.println("Received custom object: " + message.toString());
    }
}

2. Spring Kafka 사용 시 추가 고려사항

Spring Kafka는 Kafka 클라이언트 라이브러리 위에 추상화를 제공하여 개발을 편리하게 합니다.

따라서 JsonDeserializer와 관련된 설정을 정확히 이해하고 적용해야 합니다.

`JsonDeserializer`의 역할과 설정

Spring Kafka의 JsonDeserializer는 Jackson 라이브러리를 사용하여 JSON 형식의 메시지를 Java 객체로 역직렬화합니다.

이때, 역직렬화할 대상 클래스 정보를 명시해주는 것이 중요합니다.

`application.yml`을 통한 설정

JsonDeserializer를 사용할 때 spring.json.value.default.type 속성을 사용하여 기본 역직렬화 타입을 지정해야 합니다.

또한, 보안상의 이유로 역직렬화를 허용할 패키지를 spring.json.trusted.packages에 명시하는 것이 좋습니다.

# application.yml (Spring Kafka specific settings)
spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: my-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer # Spring Kafka의 JsonDeserializer 사용
      properties:
        spring.json.value.default.type: com.example.YourCustomMessageObject.YourCustomMessageObject # 기본 역직렬화 대상 클래스 지정
        spring.json.trusted.packages: com.example.* # 역직렬화를 허용할 패키지 지정 (보안 강화)
        # 만약 생산자가 __TypeId__ 헤더를 보내지 않는다면 위 default.type 설정이 필수적입니다.
        # 생산자가 __TypeId__ 헤더를 보내도록 설정하려면 spring.json.add.type.headers: true
Spring Kafka Consumer Listener 코드

설정이 올바르면, 리스너 메소드의 파라미터 타입만 정확히 지정하면 됩니다.

package com.example.kafka;

import com.example.YourCustomMessageObject.YourCustomMessageObject;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class SpringKafkaObjectConsumer {

    @KafkaListener(topics = "my-test-topic", groupId = "my-group")
    public void listen(YourCustomMessageObject message) {
        System.out.println("Received custom object via Spring Kafka: " + message.toString());
    }
}

KafkaListener 어노테이션의 containerFactory 속성을 통해 특정 컨테이너 팩토리를 지정할 수도 있습니다.

예를 들어, 여러 종류의 메시지를 처리해야 할 때 유용합니다.

3. 스키마 레지스트리 활용

복잡한 메시지 구조와 버전 관리가 필요한 환경에서는 Avro, Protobuf와 같은 스키마 기반 직렬화 방식을 고려할 수 있습니다.

Confluent Schema Registry와 함께 사용하면 데이터 계약을 중앙에서 관리하고, 직렬화/역직렬화 문제를 더욱 견고하게 해결할 수 있습니다.

스키마 레지스트리를 사용하면 생산자와 소비자가 동일한 스키마를 사용하여 데이터를 주고받음을 보장하여 ClassCastException과 같은 타입 불일치 문제를 원천적으로 방지합니다.

이는 데이터의 진화와 호환성 관리에 매우 강력한 도구입니다.

팁 - 안정적인 Kafka 메시징 시스템 구축을 위한 조언

ClassCastException 해결을 넘어, 더 견고하고 안정적인 Kafka 시스템을 구축하기 위한 실질적인 조언입니다.

로그 확인의 중요성

항상 에러 로그와 스택 트레이스를 주의 깊게 확인하세요.

에러 메시지에 포함된 정확한 클래스 이름과 에러 발생 위치는 문제 해결의 가장 중요한 단서가 됩니다.

특히 Caused by: 부분을 통해 근본 원인을 파악할 수 있습니다.

Kafka 메시지 내용 직접 확인

때로는 소비자의 코드나 설정 문제가 아니라, 생산자가 의도치 않게 다른 형식의 메시지를 보내는 경우가 있습니다.

kafka-console-consumer와 같은 도구를 사용하여 실제 Kafka 토픽에 쌓인 메시지의 내용을 직접 확인해 보세요.

이를 통해 메시지가 어떤 형태로 저장되었는지 명확히 파악할 수 있습니다.

# 토픽의 모든 메시지를 읽고 내용 확인
kafka-console-consumer --bootstrap-server localhost:9092 --topic my-test-topic --from-beginning --property print.key=true --property print.value=true

메시지 헤더 활용

Kafka 메시지는 키와 값 외에 사용자 정의 헤더를 포함할 수 있습니다.

메시지 헤더에 메시지 타입 정보나 버전 정보를 추가하여 소비자가 동적으로 역직렬화 전략을 결정하도록 구현할 수 있습니다.

이는 여러 종류의 객체를 하나의 토픽으로 보낼 때 유용합니다.

버전 관리

YourCustomMessageObject와 같은 객체의 구조가 변경될 때 직렬화/역직렬화 호환성 문제가 발생할 수 있습니다.

객체에 serialVersionUID를 명시하거나, 스키마 레지스트리를 통해 스키마 버전을 관리하여 이러한 문제를 예방해야 합니다.

이전 버전의 메시지를 새로운 버전의 소비자가 처리할 수 있도록 항상 고려해야 합니다.

통합 테스트의 중요성

생산자와 소비자는 밀접하게 연결된 시스템의 일부입니다.

항상 생산자와 소비자를 함께 테스트하여 직렬화/역직렬화 과정이 양쪽에서 완벽하게 동작하는지 확인해야 합니다.

통합 테스트는 이러한 종류의 ClassCastException을 배포 전에 발견하고 수정하는 데 결정적인 역할을 합니다.

결론

java.lang.ClassCastException: class [B cannot be cast to class com.example.YourCustomMessageObject는 Kafka 개발에서 흔히 발생하는 예외입니다.

이 에러의 핵심은 생산자와 소비자 간의 메시지 직렬화/역직렬화 방식 불일치에 있습니다.

이 글에서 제시된 단계별 가이드와 코드 예제를 통해 여러분은 문제의 원인을 정확히 파악하고 효과적으로 해결할 수 있을 것입니다.

생산자와 소비자의 데이터 계약을 명확히 하고, 적절한 직렬화/역직렬화 전략을 선택하며, Spring Kafka 설정을 정확히 구성하는 것이 중요합니다.

이를 통해 더욱 안정적이고 견고한 Kafka 기반 애플리케이션을 구축할 수 있기를 바랍니다.

  •  
반응형