본문 바로가기
Cloud

[Kafka] CDC 초기화 Script

by VENUSIM 2025. 1. 18.

개발 환경에 붙어 여러 테스트를 진행하며, connector가 kafka의 connector의 설정 정보를 포함한 connect-* topic을 읽어 올 수 없어 이슈가 자주 발생하여 문의에 대한 대응이 어려웠고 자동 초기화 기능을 구성하여 제공하였다.

 

reset.sh
> 카프카 토픽, 컨슈머 제거 스크립트

#!/bin/bash

$HOME/mysql-connector/stg/bin/scale.sh 0


set -e

NAMESPACE="infra"
LABEL="app=stg-mysql-source-connector"

KAFKA_BROKERS="10.105.90.26:9092,10.105.90.27:9092,10.105.90.33:9092"

DATE=$(date +'%Y-%m-%d')
LOG_DIR="/var/lib/jenkins/mysql-connector/stg/log"
KAFKA_DIR="/var/lib/jenkins/mysql-connector/kafka-cli"
LOG_FILE="$LOG_DIR/$DATE.log"

log() {
    echo "$(date +'%Y-%m-%d %H:%M:%S') - $1" | tee -a $LOG_FILE
}

filter_consumer_groups() {
    local groups=$1
    local exclude_group="KMOffsetCache-kaf-vsaidt-dev-m-2kme"
    local filtered_groups=()

    for group in $groups; do
        if [[ "$group" != "$exclude_group" ]]; then
            filtered_groups+=("$group")
        else
            log "Consumer Group $exclude_group는 삭제 대상에서 제외됩니다."
        fi
    done

    echo "${filtered_groups[@]}"
}

filter_topics_by_prefix() {
    local topics=$1
    local prefixes=("cdc-s1-" "stg-connect-" "STG1-")
    local filtered_topics=()
    
    for topic in $topics; do
        # 내부 토픽(__consumer_offsets 등)은 제외
        if [[ "$topic" == "__consumer_offsets" || "$topic" == "__transaction_state" ]]; then
            continue
        fi

        # 특정 접두사로 시작하는 토픽만 필터링
        for prefix in "${prefixes[@]}"; do
            if [[ "$topic" == "$prefix"* ]]; then
                filtered_topics+=("$topic")
                break
            fi
        done
    done

    echo "${filtered_topics[@]}"
}

while true; do
    POD_COUNT=$(kubectl get pods -n $NAMESPACE -l $LABEL --no-headers --kubeconfig $HOME/mysql-connector/kubeconfig.yaml 2>&1) || {
        log "Error: Failed to get pod count."
        exit 1
    }
    
    if [[ "$POD_COUNT" == *"No resources found"* ]]; then
        log "Pod의 개수가 0입니다. (No resources found in $NAMESPACE namespace)"
        POD_COUNT=0
    else
        POD_COUNT=$(echo "$POD_COUNT" | wc -l)
        log "현재 Pod 개수: $POD_COUNT"
    fi    
    
    if [ "$POD_COUNT" -eq 0 ]; then
        log "Pod의 개수가 0입니다."
        log "Kafka consumer group 삭제를 시작합니다."
        
        CONSUMER_GROUPS=$( $KAFKA_DIR/bin/kafka-consumer-groups.sh --list --bootstrap-server $KAFKA_BROKERS) || {
            log "Error: Failed to list Kafka consumer groups."
            exit 1
        }

        FILTERED_CONSUMER_GROUPS=$(filter_consumer_groups "$CONSUMER_GROUPS")
        echo "$FILTERED_CONSUMER_GROUPS" | xargs -I {} $KAFKA_DIR/bin/kafka-consumer-groups.sh --delete --group {} --bootstrap-server $KAFKA_BROKERS || {
            log "Error: Failed to delete Kafka consumer group {}"
            exit 1
        }

        log "Kafka consumer group 삭제 완료."
        log "Kafka topic 삭제를 시작합니다."
        
        TOPICS=$( $KAFKA_DIR/bin/kafka-topics.sh --list --bootstrap-server $KAFKA_BROKERS ) || {
            log "Error: Failed to list Kafka topics."
            exit 1
        }
        
        # 특정 접두사로 시작하는 토픽 필터링
        FILTERED_TOPICS=$(filter_topics_by_prefix "$TOPICS")
	if [ -n "$FILTERED_TOPICS" ]; then
	    for topic in $FILTERED_TOPICS; do
	        log "토픽 $topic 삭제를 시도합니다."
        	$KAFKA_DIR/bin/kafka-topics.sh --delete --topic "$topic" --bootstrap-server $KAFKA_BROKERS || {
	            log "Error: Failed to delete Kafka topic $topic"
        	}
	    done
	    log "Kafka 토픽 삭제 완료: $FILTERED_TOPICS"
	else
	    log "삭제할 Kafka 토픽이 없습니다."
	fi
        break
    else
        log "Pod 개수는 여전히 $POD_COUNT 입니다. 계속 대기 중..."
    fi
    
    sleep 5
done

$HOME/mysql-connector/stg/bin/scale.sh 1

 

scale.sh

> deployment 스케일 변경 스크립트

#!/bin/bash

NAMESPACE="infra"
DEPLOYMENT_NAME="stg-mysql-source-connector"

DATE=$(date +'%Y-%m-%d')
LOG_DIR="/var/lib/jenkins/mysql-connector/stg/log"
LOG_FILE="$LOG_DIR/$DATE.log"

log() {
  echo "$(date +'%Y-%m-%d %H:%M:%S') - $1" | tee -a $LOG_FILE
}

REPLICA_COUNT=${1:-0}

log "스크립트 시작: stg-mysql-source-connector Deployment replica 수 $REPLICA_COUNT 으로 변경 시작"

kubectl get deployment $DEPLOYMENT_NAME -n $NAMESPACE --kubeconfig $HOME/mysql-connector/kubeconfig.yaml &> /dev/null

if [ $? -ne 0 ]; then
  log "오류: Deployment $DEPLOYMENT_NAME를 찾을 수 없습니다."
  exit 1
fi

log "레플리카 수를 $REPLICA_COUNT으로 변경 중: $DEPLOYMENT_NAME"
kubectl scale deployment $DEPLOYMENT_NAME --replicas=$REPLICA_COUNT -n $NAMESPACE --kubeconfig $HOME/mysql-connector/kubeconfig.yaml

if [ $? -eq 0 ]; then
  log "Deployment $DEPLOYMENT_NAME의 replica 수가 $REPLICA_COUNT으로 변경되었습니다."
else
  log "오류: Deployment $DEPLOYMENT_NAME의 replica 수 변경 실패."
  exit 1
fi

log "스크립트 종료: stg-mysql-source-connector Deployment replica 수 $REPLICA_COUNT으로 변경 완료"



운영 관리자 외에 kubectl을 사용할 수 있도록 기능이 제공되는 것은 보안적 이슈가 있다고 판단하여 Jenkins로 제공

pipeline {
    agent any
    environment {
        PATH = "${env.PATH}:${env.HOME}/bin"
        KUBE_CONFIG= "/var/lib/jenkins/kubeconfig.yaml"
    }
    stages {
        stage('reset') {
            steps {
                script {
                    sh '''
                      $HOME/mysql-connector/reset.sh
                    '''
                }
            }
        }
    }
}



댓글