Rlog

오늘자 삽질 - Spring Kafka 본문

Spring

오늘자 삽질 - Spring Kafka

dev_roach 2022. 4. 13. 22:07
728x90

회사에서 Spring Kafka 를 연동하던 중에 Local 에서 콘솔로 Topic 에 직접 Message 를 쏘니 잘 안되는 상황이 발생했다.

그래서 처음에는 아 key-serializer 와 value-serializer 가 잘못 설정되어 있나? 라는 생각을 했다.

 사실 내가 짠 코드가 아니여서 코드를 디버깅 좀 해보다가 처음에는 KafkaRecord 에 자꾸 코드에서 DeserializerException 을 넣길래 아니 이게 key-value deserializer 가 잘못되어 있는거 아니야? 하고 빼고 하니깐

Default Serializer 인 StringDeserializer 로 값이 넘어와서 잘 보였다.

하지만 Object 로 Mapping 되지 않았다. 그래서 궁금해서 ErrorHandlingDeserializer 에 대해서 공부했다.

https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer

 

Spring for Apache Kafka

When using Spring for Apache Kafka in a Spring Boot application, the Apache Kafka dependency versions are determined by Spring Boot’s dependency management. If you wish to use a different version of kafka-clients or kafka-streams, and use the embedded ka

docs.spring.io

 

내 기준에서 쉽게 설명하면 사실 진짜 key/value deserlizer 는 아래 내가 properties 에 정의된 애들이였다.

spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer

즉, 여기서 깨달은 사실은 설정값에는 문제가 없다는 것이다.

그래서 어 그러면 설정이 잘 안들어가나 하고 코드를 한번 까보았다.

kafka 내부 코드인데 setUpDelegate 를 통해서 내가 설정한 Json, String Deserializer 가 잘 적용되는걸 확인할 수 있었다.

그럼 뭐가 문제일까? 곰곰히 생각해보다가 예전에 Redis 를 했을때 ObjcetMapper 에 Package 명을 포함한 Type 이 올라가던 것이 기억났었다.

 

그래서 이것저것 알아보다 보니 우리의 Configuration 에서 아래와 같은 코드가 있었다.

JsonDeserializer.TRUSTED_PACKAGES

그래서 이게 무슨 옵션이지? 하고 파보니까 Kafka 에서 Json 을 Deserialize 할때 Type 에 Package 까지보는데 Package 를 "*" 로 처리해서 패키지가 같지 않아도, Type 만 같다면 Deserializer 를 해주는 것이였다.

 

여기서 확 감이 와서 아까 작동하던 Topic 안의 메세지를 열어봤는데 아래와 같았다.

{"xxKey":value}

근데 열어보니 Type 같은건 적혀있지를 않았다. 그래서 조금 더 알아보니까 Kafka 에서는 Header 에 Type 을 넣는다고 한다.

그래서 아래와 같이 명령문을 수정했다.

sh kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--from-beginning \
--topic xx-topic \
--property print.headers=true

결과는 아래와 같았다.

__TypeId__:xx.xx.xx.xx.xx.xx$Common	{"xxKey":51373}

 내가 로컬에서 쏘던 헤더는 아래와 같았다.

NO_HEADERS	{"imrId":300}

앞으로 명심하고 내일 회사 위키에 내가 겪으면서 얻은 지식을 위키에 정리하고 마무리 해야겠다~!