본문 바로가기
Cloud

[Kafka] CDC 인프라 구축 (feat. ksql)

by VENUSIM 2025. 1. 18.

kafka의 경우 cloud data streaming service를 이용하였다. Apache Kafka를 설치하고 설정하여 클러스터 형태로 제공해 주고, CMAK을 이용하여 클러스터 토픽 생성 및 변경 등 kafka 클러스터 관리 기능을 제공한다.


가이드를 베이스로 커스텀하여 구성하였다.

https://guide-gov.ncloud-docs.com/docs/ko/cdss-cdssoverview

설정한 브로커 수량만큼 서버가 생기고 (기본 3개), 생성 된 서버는 Target Group에서 가시적으로 확인 할 수 있다.

 

Broker ACG
브로커에 접근이 가능하도록 다음과 같이 브로커 포트에 대해 ACG 허용해 주었다.
아래는 적용한 ACG 목록이다.

  • [ K8S node ] Source Connector (Debezium), KSQL Server
    • container 서비스이기에 node subnet 허용
  • Bastion (Jenkins 전환 이후 제거)
    https://dev-junhee.tistory.com/91
    • kafka cli를 활용하기 위해 허용
  • Network LoadBalanacer
    • 개발자 로컬에서 접근하기 위해 Public LB Subnet 허용
 


Broker LB ACL

개발자 로컬 테스트를 위해 public lb 생성 (9100 - 9102 리스너 등록)

“ broker-allow-ip” 라는 deny-allow group을 생성하여 acl 관리

 

Debezium

kafka source connector 중 mysql db 변경 감지를 위해 debezium 이용하고 container 환경으로 구성

 

이미지 생성

container 기반으로 동작 시키기 위해 이미지 생성

 

connect-distributed.properties

# Kafka 브로커 주소 설정
bootstrap.servers=broker1-ip:9092,broker2-ip:9092,broker3-ip:9092

# Kafka Connect 그룹 ID 설정
group.id=connect-cluster

# JSON 포맷으로 메시지를 변환할 변환기 설정
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# 메시지 키와 값의 스키마 포함 여부 설정
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# 기본 복제 팩터 설정 (broker 개수)
default.replication.factor=3

# 오프셋 저장할 Kafka 토픽 설정
offset.storage.topic=connect-offsets
# 오프셋 정보 복제 수량 설정 (broker 개수)
offset.storage.replication.factor=3

# Connector 설정 저장할 Kafka 토픽 설정
config.storage.topic=connect-configs
# 설정 정보 복제 수량 설정 (broker 개수)
config.storage.replication.factor=3

# Connector 상태 저장할 Kafka 토픽 설정
status.storage.topic=connect-status
# 상태 정보 복제 수량 설정 (broker 개수)
status.storage.replication.factor=3

# 오프셋 플러시 주기 설정 (밀리초 단위)
offset.flush.interval.ms=10000

# Kafka Connect HTTP 리스너 주소 설정
listeners=HTTP://:8083

# Kafka Connect 커넥터 플러그인 경로 설정
plugin.path=/opt/connector

 

Dockerfile

FROM apache/kafka:3.9.0

RUN mkdir -p /opt/connectors
RUN chmod 775 -R /opt/connectors
COPY connect-distributed.properties /opt/kafka/config/
COPY connector /opt/connector
RUN chmod +x /opt/kafka/bin/connect-distributed.sh

CMD ["/opt/kafka/bin/connect-distributed.sh", "/opt/kafka/config/connect-distributed.properties"]

 

/opt/connector에는 플러그인 들이 포함되어 있음 이 중  이미지 사이즈를 고려하여 debezium-connector-mysql 제외 제거

 

 

Broker public endpint
로컬 개발을 위한 public endpoint 제공

broker.com:9100, broker.com:9101, broker.com:9102

 

kafka-source-connector

https://docs.confluent.io/platform/current/connect/references/restapi.html

connector 생성 예제

curl -X POST t-connector.com/connectors \
   -H "Content-Type: application/json" \
   -d '{
  "name": "dev-mysql-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "dbendpoint",
    "database.port": "3306",
    "database.user": "user",
    "database.password": "passwd",
    "database.server.id": "3318376",
    "database.server.name": "DEV_MYSQL",
    "database.whitelist": "aidt_lms",
    "database.history.kafka.bootstrap.servers": "broker1,broker2,broker3",
    "database.history.kafka.topic": "DEV_MYSQL_HISTORY",
    "snapshot.locking.mode": "none",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter"
  }
}'

 

ksql server

https://docs.confluent.io/platform/current/ksqldb/developer-guide/ksqldb-rest-api/rest-api-reference.html

ksql restapi 예제

curl -X "POST" "https://ksql.com/ksql" \
     -H "Accept: application/json" \
     -d '{
  "ksql": "show topics;",
  "streamsProperties": {}
}'

 

 

Trouble shooting

https://dev-junhee.tistory.com/90

 

[Kafka] Message size 제한 이슈 대응기

connector 요청에서 에러가 발생했습니다.{"name":"connector","connector":{"state":"RUNNING","worker_id":"ip:port"},"tasks":[{"id":0,"state":"FAILED","worker_id":"ip:port","trace":"org.apache.kafka.connect.errors.ConnectException: Unrecoverable e

dev-junhee.tistory.com

 

 

댓글