https://dev-junhee.tistory.com/90
[Kafka] Message size 제한 이슈 대응기
connector 요청에서 에러가 발생했습니다.{"name":"connector","connector":{"state":"RUNNING","worker_id":"ip:port"},"tasks":[{"id":0,"state":"FAILED","worker_id":"ip:port","trace":"org.apache.kafka.connect.errors.ConnectException: Unrecoverable e
dev-junhee.tistory.com
위 글에서 이어지며, 트러블 슈팅에 대한 개선 포인트를 발견하고 개선 방식에 대해 설명하려 합니다.
다방면에서 CDC를 도입하여 사용하고 있으나, 통계 대시보드 등에서 message size 이슈가 발생하였다.
기존에는 connector의 producer request max size, broker 의 message max size 수치 조절을 통해 이슈를 해소하였으나, 점점 큰 데이터의 발생으로 60mb 이상의 메세지가 발생하는 현상이 생겼다.
Connector의 중지
큰 메세지로 인해 발생되는 이슈 포인트는 connector의 설정 중 event.processing.failure.hadling.mode를 기본 값인 fail로 처리하고 있어, exception 발생 시 connector가 중지되어 통계에 영향을 주는 현상이 발생하였다.
예상할 수 없는 exception으로 인한 connector의 중지를 해소하기 위해 탐구한 결과 connector의 설정에 대한 세밀한 고려가 필요하다고 생각이 들었다.
개선이 필요한 connector 설정 값
snapshot.mode=initial
event.processing.failure.hadling.mode=Fail
해당 설정은 connector 구동 당시의 data기준으로 snapshot을 생성하고, binlog 기준으로 변경된 데이터를 kafka topic에 적재하는 상태이다.
예상되는 이슈
Fail로 인한 connector가 중지되는 경우 재구동까지의 시간동안의 데이터의 유실이 발생한다.
해결 방안
event.processing.failure.hadling.mode 를 Warn으로 설정하여 로그를 기록하고
snapshot.mode=never로 설정하여, binlog를 처음부터 읽고 consume하는 쪽에서 offset과 같은 로직을 통해 가장 마지막으로 성공한 위치부터 다시 데이터를 처리하는 설계가 필요하다고 생각이 들었다.
이를 해소하기 위해 수치 조절을 통해 message size를 조절해도 되나 얼마나 큰 데이터로 변경이 될지 그리고 큰 데이터로 인해 replicatoin delay, disk 등 사이드 이펙트 발생에 대한 우려가 있어 개선이 필요하다고 느꼈다.
SMT 도입
개선을 위해 SMT (Single Message Transformation)를 도입하여 큰 메세지에 대한 개선을 요구하였고, 다음과 같은 설계를 고려하고 있다.
Single Message Transform(SMT)은 Kafka Connect 프레임워크에서 제공하는 기능으로, 데이터가 소스에서 싱크로 전달되는 과정에서 개별 메시지를 변환할 수 있게 해줍니다.
kafka connect producer max request size 이슈 해소를 위해 Custom SMT를 구현하였습니다.
https://docs.confluent.io/platform/current/connect/transforms/custom.html
Create Custom Kafka Connect Single Message Transforms for Confluent Platform | Confluent Documentation
If none of the available Single Message Transformations (SMTs) provide the necessary transformation, you can create your own. An important concept to understand first is that, generally, SMT implementations provide the bulk of the logic in an abstract clas
docs.confluent.io
구현 전 고려사항
커스텀 SMT로 제작 시, Hot Reloading 인터페이스를 구현해서 지속적으로 문제가 될만한 필드들을 추가 할 수 있도록 하면 어떨까 싶었다.
(org.apache.kafka.connect.reconfigurable.Reconfigurable 인터페이스 구현)
REST API로 개별 Connector의 설정 변경이 가능하니까, 설정 변경으로 제어가 가능할 것 같았다.
확장해서 생각해 보면, 나중에 추가되는 필드들에 대해 타입 검사를 하는 CDC를 만들어 두고 이 필드의 사이즈가 Medium Text, Long Text라고 하면 REST API Connector 설정 변경을 통해 자동화된 프로세스로도 개선 가능할 것으로 보였다.
이슈 포인트 컬럼
설계
SMT 구현 로직
connector 설정 중 colum.exclude.list 옵션을 사용하여 불 필요한 컬럼은 제외하고, 이외의 값 자체가 활용되지 않지만 존재 여부는 판단이 필요한 라지 데이터에 대해 SMT를 적용하여 설정한 값보다 값이 크면 설정한 value 값으로 치환는 로직으로 전환하였습니다.
Connector 생성 시 활용한 config list
name: 커넥터 고유 식별자
connector.class: MySQL용 Debezium 커넥터 클래스
database.hostname: 데이터베이스 호스트 주소
database.port: 데이터베이스 포트 번호
database.user: 데이터베이스 접속 유저
database.password: 데이터베이스 접속 비밀번호
database.server.id: MySQL 서버의 고유 식별자
database.server.name: CDC 서버의 논리적 이름
table.include.list: 모니터링할 테이블 목록 (토픽적재 테이블)
colum.exclude.list : 제외할 테이블 컬럼명
database.history.kafka.bootstrap.servers: 스키마 변경 이력 Kafka 브로커
database.history.kafka.topic: 스키마 변경 이력 토픽
topic.creation.enable: 토픽 자동 생성 여부
topic.creation.default.partitions: 토픽 기본 파티션 수
topic.creation.default.replication.factor: 토픽 기본 복제 팩터
transforms.sizeLimiter.type jar 파일 value값
transforms.sizeLimiter.field.name : SMT적용할 컬럼명
transforms.sizeLimiter.target.value : 적용된 컬럼 데이터 변경할 값
transforms.sizeLimiter.max.field.size.bytes smt에 mb 제한값
schema.history.internal.kafka.bootstrap.servers: 내부 스키마 이력 Kafka 브로커
schema.history.internal.kafka.topic: 내부 스키마 이력 토픽
snapshot.mode: 스냅샷 모드
snapshot.locking.mode: 스냅샷 잠금 모드
max.request.size: 최대 요청 크기
snapshot.fetch.size: 스냅샷 페치 크기
max.batch.size: 최대 배치 크기
snapshot.delay.ms: 스냅샷 지연 시간
max.queue.size: 최대 대기열 크기
queue.flush.timeout.ms: 대기열 플러시 타임아웃
'Cloud' 카테고리의 다른 글
[K8S] Threshold & Garbage Collect (0) | 2025.03.09 |
---|---|
로그 모니터링 시스템 개발기 (0) | 2025.02.13 |
[Locust] 대시보드 커스텀 (feat. websocket 부하테스트) (0) | 2025.01.31 |
[K8S] Pod Health Check (feat. 부하테스트) (1) | 2025.01.31 |
[Rclone] 파일 동기화 시스템 구축 (0) | 2025.01.31 |
댓글