connector 요청에서 에러가 발생했습니다.
{"name":"connector","connector":{"state":"RUNNING","worker_id":"ip:port"},"tasks":[{"id":0,"state":"FAILED","worker_id":"ip:port","trace":"org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:290)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:351)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\nCaused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1055132 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.\n"}],"type":"source"}
메세지 크기제한으로 CDC 데이터가 정상적으로 들어오지 않는 것으로 유추됩니다. 가이드 주신 대로 CDC 토픽은 필요한 테이블만 include 하여 사용중입니다.
확인부탁드립니다.
Kafka source connector에 connector 등록 중 위와 같은 이슈가 발생한다는 문의를 받았다.
해결 방안 제시
connector 생성하신 내용을 확인해 보니 producer max request size에 대한 조절이 안되어 있는 것으로 확인됩니다.
producer, consumer 기본 설정에 대해 override가 가능합니다.
기존 connector 생성시 사용하셨던 설정에 추가적으로
"producer.override.max.request.size":"2097152" 기본 값에 2배에 해당하는 값으로 설정하였습니다.
curl -X POST localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"snapshot.locking.mode": "none",
"database.user": "user",
"database.server.id": "3318377",
"database.history.kafka.bootstrap.servers": "broker1,broker2,broker3",
"database.history.kafka.topic": "DEV_MYSQL_HISTORY",
"database.server.name": "CDC_TEST",
"database.port": "3306",
"database.history.recovery.mode": "SCHEMA_ONLY_RECOVERY",
"database.hostname": "databaseendpoint",
"database.password": "password",
"name": "connector",
"table.include.list": "test.article,test.article_lib,...,test.textbookdeploy",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"snapshot.mode": "initial",
"producer.override.max.request.size": "2097152"
}
}'
connector에서 추가적인 설정을 하여도, broker에 설정을 초과하면 오류가 발생할 수 있습니다.
예를 들면 다음과 같습니다.
connector producer max request size 를 2mb 로 설정하였지만,
broker 의 message.max.bytes 는 1mb 인 상황일 경우 broker에서 message를 수신하지 못합니다.
broker의 설정도 필요한 상황이며, 모니터링을 진행하며 현재까지 발생한 오류는 전부 해소해 놓은 상태입니다.
broker 설정 방법은 다음과 같습니다.
- 안내 드린 카프카 터널링 서버 접속
- /kafka-cli/kafka/bin 의 kafka-configs.sh 를 통해 설정
- 아래 명령어 참고
Kafka config 수정
아래 명령어를 사용하시면 되시고, 2가지 사항만 변경해 주시면 됩니다.
--entity-name은 broker의 id입니다. 현재 3개의 broker (806,807,808) 가 있습니다.
--add-config 는 key=value 형태로 작성해 주시면 됩니다.
kafka-configs.sh --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --entity-type brokers --entity-name 806 --alter --add-config message.max.bytes=5242880
Kafka config 확인
아래 명령어를 사용하시면 되시고, 2가지 사항만 변경해 주시면 됩니다.
--entity-name은 broker의 id입니다. 현재 3개의 broker (806,807,808) 가 있습니다.
| grep 을 통해 해당하는 설정을 필터하여 찾으실 수 있습니다.
kafka-configs.sh --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --describe --all --entity-type brokers --entity-name 806 | grep message.max.bytes
'Cloud' 카테고리의 다른 글
[Kafka] CDC 인프라 구축 (feat. ksql) (0) | 2025.01.18 |
---|---|
[Kafka] CDC 초기화 Script (0) | 2025.01.18 |
[Gitlab] Backup, Restore 이후 500 에러 트러블 슈팅 (0) | 2025.01.13 |
[CSAP] K8S Node Init Script (1) | 2024.12.27 |
[ArgoCD] 계정 생성 (1) | 2024.12.27 |
댓글