개발 환경에 붙어 여러 테스트를 진행하며, connector가 kafka의 connector의 설정 정보를 포함한 connect-* topic을 읽어 올 수 없어 이슈가 자주 발생하여 문의에 대한 대응이 어려웠고 자동 초기화 기능을 구성하여 제공하였다.
reset.sh
> 카프카 토픽, 컨슈머 제거 스크립트
#!/bin/bash
$HOME/mysql-connector/stg/bin/scale.sh 0
set -e
NAMESPACE="infra"
LABEL="app=stg-mysql-source-connector"
KAFKA_BROKERS="10.105.90.26:9092,10.105.90.27:9092,10.105.90.33:9092"
DATE=$(date +'%Y-%m-%d')
LOG_DIR="/var/lib/jenkins/mysql-connector/stg/log"
KAFKA_DIR="/var/lib/jenkins/mysql-connector/kafka-cli"
LOG_FILE="$LOG_DIR/$DATE.log"
log() {
echo "$(date +'%Y-%m-%d %H:%M:%S') - $1" | tee -a $LOG_FILE
}
filter_consumer_groups() {
local groups=$1
local exclude_group="KMOffsetCache-kaf-vsaidt-dev-m-2kme"
local filtered_groups=()
for group in $groups; do
if [[ "$group" != "$exclude_group" ]]; then
filtered_groups+=("$group")
else
log "Consumer Group $exclude_group는 삭제 대상에서 제외됩니다."
fi
done
echo "${filtered_groups[@]}"
}
filter_topics_by_prefix() {
local topics=$1
local prefixes=("cdc-s1-" "stg-connect-" "STG1-")
local filtered_topics=()
for topic in $topics; do
# 내부 토픽(__consumer_offsets 등)은 제외
if [[ "$topic" == "__consumer_offsets" || "$topic" == "__transaction_state" ]]; then
continue
fi
# 특정 접두사로 시작하는 토픽만 필터링
for prefix in "${prefixes[@]}"; do
if [[ "$topic" == "$prefix"* ]]; then
filtered_topics+=("$topic")
break
fi
done
done
echo "${filtered_topics[@]}"
}
while true; do
POD_COUNT=$(kubectl get pods -n $NAMESPACE -l $LABEL --no-headers --kubeconfig $HOME/mysql-connector/kubeconfig.yaml 2>&1) || {
log "Error: Failed to get pod count."
exit 1
}
if [[ "$POD_COUNT" == *"No resources found"* ]]; then
log "Pod의 개수가 0입니다. (No resources found in $NAMESPACE namespace)"
POD_COUNT=0
else
POD_COUNT=$(echo "$POD_COUNT" | wc -l)
log "현재 Pod 개수: $POD_COUNT"
fi
if [ "$POD_COUNT" -eq 0 ]; then
log "Pod의 개수가 0입니다."
log "Kafka consumer group 삭제를 시작합니다."
CONSUMER_GROUPS=$( $KAFKA_DIR/bin/kafka-consumer-groups.sh --list --bootstrap-server $KAFKA_BROKERS) || {
log "Error: Failed to list Kafka consumer groups."
exit 1
}
FILTERED_CONSUMER_GROUPS=$(filter_consumer_groups "$CONSUMER_GROUPS")
echo "$FILTERED_CONSUMER_GROUPS" | xargs -I {} $KAFKA_DIR/bin/kafka-consumer-groups.sh --delete --group {} --bootstrap-server $KAFKA_BROKERS || {
log "Error: Failed to delete Kafka consumer group {}"
exit 1
}
log "Kafka consumer group 삭제 완료."
log "Kafka topic 삭제를 시작합니다."
TOPICS=$( $KAFKA_DIR/bin/kafka-topics.sh --list --bootstrap-server $KAFKA_BROKERS ) || {
log "Error: Failed to list Kafka topics."
exit 1
}
# 특정 접두사로 시작하는 토픽 필터링
FILTERED_TOPICS=$(filter_topics_by_prefix "$TOPICS")
if [ -n "$FILTERED_TOPICS" ]; then
for topic in $FILTERED_TOPICS; do
log "토픽 $topic 삭제를 시도합니다."
$KAFKA_DIR/bin/kafka-topics.sh --delete --topic "$topic" --bootstrap-server $KAFKA_BROKERS || {
log "Error: Failed to delete Kafka topic $topic"
}
done
log "Kafka 토픽 삭제 완료: $FILTERED_TOPICS"
else
log "삭제할 Kafka 토픽이 없습니다."
fi
break
else
log "Pod 개수는 여전히 $POD_COUNT 입니다. 계속 대기 중..."
fi
sleep 5
done
$HOME/mysql-connector/stg/bin/scale.sh 1
scale.sh
> deployment 스케일 변경 스크립트
#!/bin/bash
NAMESPACE="infra"
DEPLOYMENT_NAME="stg-mysql-source-connector"
DATE=$(date +'%Y-%m-%d')
LOG_DIR="/var/lib/jenkins/mysql-connector/stg/log"
LOG_FILE="$LOG_DIR/$DATE.log"
log() {
echo "$(date +'%Y-%m-%d %H:%M:%S') - $1" | tee -a $LOG_FILE
}
REPLICA_COUNT=${1:-0}
log "스크립트 시작: stg-mysql-source-connector Deployment replica 수 $REPLICA_COUNT 으로 변경 시작"
kubectl get deployment $DEPLOYMENT_NAME -n $NAMESPACE --kubeconfig $HOME/mysql-connector/kubeconfig.yaml &> /dev/null
if [ $? -ne 0 ]; then
log "오류: Deployment $DEPLOYMENT_NAME를 찾을 수 없습니다."
exit 1
fi
log "레플리카 수를 $REPLICA_COUNT으로 변경 중: $DEPLOYMENT_NAME"
kubectl scale deployment $DEPLOYMENT_NAME --replicas=$REPLICA_COUNT -n $NAMESPACE --kubeconfig $HOME/mysql-connector/kubeconfig.yaml
if [ $? -eq 0 ]; then
log "Deployment $DEPLOYMENT_NAME의 replica 수가 $REPLICA_COUNT으로 변경되었습니다."
else
log "오류: Deployment $DEPLOYMENT_NAME의 replica 수 변경 실패."
exit 1
fi
log "스크립트 종료: stg-mysql-source-connector Deployment replica 수 $REPLICA_COUNT으로 변경 완료"
운영 관리자 외에 kubectl을 사용할 수 있도록 기능이 제공되는 것은 보안적 이슈가 있다고 판단하여 Jenkins로 제공
pipeline {
agent any
environment {
PATH = "${env.PATH}:${env.HOME}/bin"
KUBE_CONFIG= "/var/lib/jenkins/kubeconfig.yaml"
}
stages {
stage('reset') {
steps {
script {
sh '''
$HOME/mysql-connector/reset.sh
'''
}
}
}
}
}
'Cloud' 카테고리의 다른 글
[Rclone] 파일 동기화 시스템 구축 (0) | 2025.01.31 |
---|---|
[Kafka] CDC 인프라 구축 (feat. ksql) (0) | 2025.01.18 |
[Kafka] Message size 제한 이슈 대응기 (0) | 2025.01.18 |
[Gitlab] Backup, Restore 이후 500 에러 트러블 슈팅 (0) | 2025.01.13 |
[CSAP] K8S Node Init Script (1) | 2024.12.27 |
댓글