본문 바로가기
Cloud

[Kafka] Message size 제한 이슈 대응기

by VENUSIM 2025. 1. 18.
connector 요청에서 에러가 발생했습니다.

{"name":"connector","connector":{"state":"RUNNING","worker_id":"ip:port"},"tasks":[{"id":0,"state":"FAILED","worker_id":"ip:port","trace":"org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:290)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:351)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\nCaused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1055132 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.\n"}],"type":"source"}

메세지 크기제한으로 CDC 데이터가 정상적으로 들어오지 않는 것으로 유추됩니다. 가이드 주신 대로 CDC 토픽은 필요한 테이블만 include 하여 사용중입니다.
확인부탁드립니다. 

 

Kafka source connector에 connector 등록 중 위와 같은 이슈가 발생한다는 문의를 받았다.

 

해결 방안 제시

connector 생성하신 내용을 확인해 보니 producer max request size에 대한 조절이 안되어 있는 것으로 확인됩니다.
producer, consumer 기본 설정에 대해 override가 가능합니다.

https://docs.confluent.io/platform/current/connect/references/allconfigs.html?&_ga=2.111770500.1013123942.1733465063-674917880.1731052704&_gac=1.154789450.1733374956.CjwKCAiAmMC6BhA6EiwAdN5iLUNXo2vOCgpt-Gj2eMWWvEkl6n8Mz00gWWtXM9itSN7bQnbtlrCgzxoCQI0QAvD_BwE&_gl=1*17axooi*_gcl_aw*R0NMLjE3MzMzNzQ5NTYuQ2p3S0NBaUFtTUM2QmhBNkVpd0FkTjVpTFVOWG8ydk9DZ3B0LUdqMmVNV1d2RWtsNm44TXowMGdXV3RYTTlpdFNON2JRbmJ0bHJDZ3p4b0NRSTBRQXZEX0J3RQ..*_gcl_au*MjAxMTE5MTA0My4xNzMxMDUyNzAz*_ga*Njc0OTE3ODgwLjE3MzEwNTI3MDQ.*_ga_D2D3EGKSGD*MTczMzQ2NTA2My4yMC4wLjE3MzM0NjUwNjMuNjAuMC4w#override-the-worker-configuration

 

 

 

기존 connector 생성시 사용하셨던 설정에 추가적으로 

"producer.override.max.request.size":"2097152" 기본 값에 2배에 해당하는 값으로 설정하였습니다.

 

curl -X POST localhost:8083/connectors \

   -H "Content-Type: application/json" \

   -d '{
  "name": "connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "snapshot.locking.mode": "none",
    "database.user": "user",
    "database.server.id": "3318377",
    "database.history.kafka.bootstrap.servers": "broker1,broker2,broker3",
    "database.history.kafka.topic": "DEV_MYSQL_HISTORY",
    "database.server.name": "CDC_TEST",
    "database.port": "3306",
    "database.history.recovery.mode": "SCHEMA_ONLY_RECOVERY",
    "database.hostname": "databaseendpoint",
    "database.password": "password",
    "name": "connector",
    "table.include.list": "test.article,test.article_lib,...,test.textbookdeploy",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "snapshot.mode": "initial",
    "producer.override.max.request.size": "2097152"
  }
}'

 

 

connector에서 추가적인 설정을 하여도, broker에 설정을 초과하면 오류가 발생할 수 있습니다.

 

예를 들면 다음과 같습니다.

connector producer max request size 를 2mb 로 설정하였지만,

broker 의  message.max.bytes 는 1mb 인 상황일 경우 broker에서 message를 수신하지 못합니다.

broker의 설정도 필요한 상황이며, 모니터링을 진행하며 현재까지 발생한 오류는 전부 해소해 놓은 상태입니다.

broker 설정 방법은 다음과 같습니다.

  1. 안내 드린 카프카 터널링 서버 접속
  2. /kafka-cli/kafka/bin 의 kafka-configs.sh 를 통해 설정
  3. 아래 명령어 참고

Kafka config 수정

아래 명령어를 사용하시면 되시고,  2가지 사항만 변경해 주시면 됩니다.
 --entity-name은 broker의 id입니다. 현재 3개의 broker (806,807,808) 가 있습니다.
--add-config 는 key=value 형태로 작성해 주시면 됩니다.

kafka-configs.sh --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --entity-type brokers --entity-name 806 --alter --add-config message.max.bytes=5242880

 

Kafka config 확인

아래 명령어를  사용하시면 되시고, 2가지 사항만 변경해 주시면 됩니다.
 --entity-name은 broker의 id입니다. 현재 3개의 broker (806,807,808) 가 있습니다.
| grep 을 통해 해당하는 설정을 필터하여 찾으실 수 있습니다.

kafka-configs.sh --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --describe --all --entity-type brokers --entity-name 806 | grep message.max.bytes

 

 

 

위 이슈로 connector에서 exception이 발생하며 정지되는 현상이 발생하였고,

그로 인해 topic, connector 등에 초기화 작업이 이루어 져야했다.

https://dev-junhee.tistory.com/91

 

[Kafka] CDC 초기화 Script

개발 환경에 붙어 여러 테스트를 진행하며, connector가 kafka의 connector의 설정 정보를 포함한 connect-* topic을 읽어 올 수 없어 이슈가 자주 발생하여 문의에 대한 대응이 어려웠고 자동 초기화 기능

dev-junhee.tistory.com

 

 

운영을 하면서 다양한 이슈를 직면하였고, message size에 대한 개선 및 connector의 설정을 조금 더 상세하게 커스텀하고 로직을 추가하여 풀어나가야 되겠다는 결론으로 작업을 진행해보려한다.

 

https://dev-junhee.tistory.com/100

 

[Kafka] Message Size 이슈 개선

다방면에서 CDC를 도입하여 사용하고 있으나, 통계 대시보드 등에서 message size 이슈가 발생하였다.기존에는 connector의 producer request max size, broker 의 message max size 수치 조절을 통해 이슈를 해소하였

dev-junhee.tistory.com

 

댓글