kafka의 경우 cloud data streaming service를 이용하였다. Apache Kafka를 설치하고 설정하여 클러스터 형태로 제공해 주고, CMAK을 이용하여 클러스터 토픽 생성 및 변경 등 kafka 클러스터 관리 기능을 제공한다.

가이드를 베이스로 커스텀하여 구성하였다.
https://guide-gov.ncloud-docs.com/docs/ko/cdss-cdssoverview
설정한 브로커 수량만큼 서버가 생기고 (기본 3개), 생성 된 서버는 Target Group에서 가시적으로 확인 할 수 있다.

Broker ACG
브로커에 접근이 가능하도록 다음과 같이 브로커 포트에 대해 ACG 허용해 주었다.
아래는 적용한 ACG 목록이다.
- [ K8S node ] Source Connector (Debezium), KSQL Server
- container 서비스이기에 node subnet 허용
- Bastion (Jenkins 전환 이후 제거)
https://dev-junhee.tistory.com/91- kafka cli를 활용하기 위해 허용
- Network LoadBalanacer
- 개발자 로컬에서 접근하기 위해 Public LB Subnet 허용
Broker LB ACL
개발자 로컬 테스트를 위해 public lb 생성 (9100 - 9102 리스너 등록)


Debezium
kafka source connector 중 mysql db 변경 감지를 위해 debezium 이용하고 container 환경으로 구성
이미지 생성
container 기반으로 동작 시키기 위해 이미지 생성
connect-distributed.properties
# Kafka 브로커 주소 설정
bootstrap.servers=broker1-ip:9092,broker2-ip:9092,broker3-ip:9092
# Kafka Connect 그룹 ID 설정
group.id=connect-cluster
# JSON 포맷으로 메시지를 변환할 변환기 설정
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 메시지 키와 값의 스키마 포함 여부 설정
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# 기본 복제 팩터 설정 (broker 개수)
default.replication.factor=3
# 오프셋 저장할 Kafka 토픽 설정
offset.storage.topic=connect-offsets
# 오프셋 정보 복제 수량 설정 (broker 개수)
offset.storage.replication.factor=3
# Connector 설정 저장할 Kafka 토픽 설정
config.storage.topic=connect-configs
# 설정 정보 복제 수량 설정 (broker 개수)
config.storage.replication.factor=3
# Connector 상태 저장할 Kafka 토픽 설정
status.storage.topic=connect-status
# 상태 정보 복제 수량 설정 (broker 개수)
status.storage.replication.factor=3
# 오프셋 플러시 주기 설정 (밀리초 단위)
offset.flush.interval.ms=10000
# Kafka Connect HTTP 리스너 주소 설정
listeners=HTTP://:8083
# Kafka Connect 커넥터 플러그인 경로 설정
plugin.path=/opt/connector
Dockerfile
FROM apache/kafka:3.9.0
RUN mkdir -p /opt/connectors
RUN chmod 775 -R /opt/connectors
COPY connect-distributed.properties /opt/kafka/config/
COPY connector /opt/connector
RUN chmod +x /opt/kafka/bin/connect-distributed.sh
CMD ["/opt/kafka/bin/connect-distributed.sh", "/opt/kafka/config/connect-distributed.properties"]
/opt/connector에는 플러그인 들이 포함되어 있음 이 중 이미지 사이즈를 고려하여 debezium-connector-mysql 제외 제거
Broker public endpint
로컬 개발을 위한 public endpoint 제공
broker.com:9100, broker.com:9101, broker.com:9102
kafka-source-connector
https://docs.confluent.io/platform/current/connect/references/restapi.html
connector 생성 예제
curl -X POST t-connector.com/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "dev-mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "dbendpoint",
"database.port": "3306",
"database.user": "user",
"database.password": "passwd",
"database.server.id": "3318376",
"database.server.name": "DEV_MYSQL",
"database.whitelist": "aidt_lms",
"database.history.kafka.bootstrap.servers": "broker1,broker2,broker3",
"database.history.kafka.topic": "DEV_MYSQL_HISTORY",
"snapshot.locking.mode": "none",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}'
ksql server
ksql restapi 예제
curl -X "POST" "https://ksql.com/ksql" \
-H "Accept: application/json" \
-d '{
"ksql": "show topics;",
"streamsProperties": {}
}'
Trouble shooting
https://dev-junhee.tistory.com/90
[Kafka] Message size 제한 이슈 대응기
connector 요청에서 에러가 발생했습니다.{"name":"connector","connector":{"state":"RUNNING","worker_id":"ip:port"},"tasks":[{"id":0,"state":"FAILED","worker_id":"ip:port","trace":"org.apache.kafka.connect.errors.ConnectException: Unrecoverable e
dev-junhee.tistory.com
'Cloud' 카테고리의 다른 글
[K8S] Pod Health Check (feat. 부하테스트) (1) | 2025.01.31 |
---|---|
[Rclone] 파일 동기화 시스템 구축 (0) | 2025.01.31 |
[Kafka] CDC 초기화 Script (0) | 2025.01.18 |
[Kafka] Message size 제한 이슈 대응기 (0) | 2025.01.18 |
[Gitlab] Backup, Restore 이후 500 에러 트러블 슈팅 (0) | 2025.01.13 |
댓글