Server
[Kafka/Flink] CDC 복구 스크립트
VENUSIM
2025. 6. 10. 15:42
데이터의 정합성의 문제, CDC 관련하여 빠른 대응 및 복구가 필요한 경우를 대비하여 start, stop 등의 자동화 스크립트를 구성하였다.
▷ 스크립트
더보기
#!/bin/bash
# 초기화 수정본
# 프로젝트 루트 디렉토리 설정 (스크립트가 있는 디렉토리)
PROJECT_ROOT="$(cd "$(dirname "$0")" && pwd)"
# 로그 파일 설정
LOG_DIR="${JENKINS_HOME}/schedule-flink/logs"
LOG_FILE="${LOG_DIR}/flink_connector_manager.log"
# 서버 구분을 위한 환경 변수 (R, B, 또는 S1)
SERVER_TYPE=${SERVER_TYPE:-"release"} # 기본값은 R 서버
JENKINS_USER=${JENKINS_USER}
JENKINS_TOKEN=${JENKINS_TOKEN}
KAFKAOPTION_DIR="${PROJECT_ROOT}/src/main/resources/kafkaOption"
# 실행할 환경 및 타입 옵션 배열 정의 - 서버 타입에 따라 다르게 설정
if [ "$SERVER_TYPE" = "release" ]; then
declare -a ENV_OPTIONS=(
"--env RENG --type eng"
"--env RMATH --type math"
)
# R 서버 변수 설정
ENG_KAFKA_CONNECTOR="https://r-engl-connector.example.com/connectors"
MATH_KAFKA_CONNECTOR="https://r-math-connector.example.com/connectors"
JOBMANAGER_URL="http://r-jobmanager.example.com:18888"
ENG_KAFKA_DIR="${KAFKAOPTION_DIR}/releaseEng"
MATH_KAFKA_DIR="${KAFKAOPTION_DIR}/releaseMath"
elif [ "$SERVER_TYPE" = "beta-2e" ]; then
declare -a ENV_OPTIONS=(
"--env BENG --type eng"
"--env BMATH --type math"
)
# B2e 서버 변수 설정
ENG_KAFKA_CONNECTOR="https://b-engl-connector.example.com/connectors"
MATH_KAFKA_CONNECTOR="https://b-math-connector.example.com/connectors"
JOBMANAGER_URL="http://b-engl-jobmanager.example.com:18888"
MATH_JOBMANAGER_URL="http://b-math-jobmanager.example.com:18888"
ENG_KAFKA_DIR="${KAFKAOPTION_DIR}/BETAENG"
MATH_KAFKA_DIR="${KAFKAOPTION_DIR}/BETAMATH"
elif [ "$SERVER_TYPE" = "dev" ]; then
declare -a ENV_OPTIONS=(
"--env DEV --type eng"
"--env DEV --type math"
)
# B2e 서버 변수 설정
COMMON_KAFKA_CONNECTOR="https://t-connector.example.com/connectors"
JOBMANAGER_URL="http://test-jobmanager.example.com:18888"
COMMON_KAFKA_DIR="${KAFKAOPTION_DIR}/DEV"
elif [ "$SERVER_TYPE" = "stg" ]; then
declare -a ENV_OPTIONS=(
"--env STG --type eng"
"--env STG --type math"
)
COMMON_KAFKA_CONNECTOR="https://s-connector.example.com/connectors"
JOBMANAGER_URL="http://t-jobmanager.example.com:18888"
COMMON_KAFKA_DIR="${KAFKAOPTION_DIR}/STG"
else
# S1 서버 환경 옵션
declare -a ENV_OPTIONS=(
"--env STG1 --type eng"
"--env STG1 --type math"
)
COMMON_KAFKA_CONNECTOR="https://s1-connector.example.com/connectors"
JOBMANAGER_URL="http://t-jobmanager.example.com:18888"
COMMON_KAFKA_DIR="${KAFKAOPTION_DIR}/STG1"
fi
JAR_DIR="${PROJECT_ROOT}/build/libs"
# 토픽 생성 대기 시간 (초) - 30분
TOPIC_WAIT_TIME=1800
# 로그 함수
log_message() {
# 로그 디렉토리 확인 및 생성
if [ ! -d "$LOG_DIR" ]; then
mkdir -p "$LOG_DIR"
fi
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> "$LOG_FILE"
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1"
}
# JSON 파일에서 커넥터 이름 추출 함수
extract_connector_name() {
local file=$1
# JSON 파일에서 'name' 필드 값 추출
local name=$(grep -o '"name"[[:space:]]*:[[:space:]]*"[^"]*"' $file | sed 's/"name"[[:space:]]*:[[:space:]]*"//g' | sed 's/"//g')
if [ -z "$name" ]; then
# 파일 이름에서 추출 (예: connector_v_lcms.json -> v_lcms)
name=$(basename $file .json | sed 's/connector_//g')
fi
echo "$name"
}
# 서버 유형에 따라 특정 Flink 작업만 중지하는 함수
stop_specific_flink_jobs() {
if [ "$SERVER_TYPE" = "release" ]; then
log_message "${SERVER_TYPE} 서버의 모든 Flink 작업 중지 시작"
stop_all_flink_jobs
elif [ "$SERVER_TYPE" = "beta-2e" ]; then
log_message "${SERVER_TYPE} 서버의 모든 Flink 작업 중지 시작"
stop_all_flink_jobs
stop_all_flink_jobs_math
else
log_message "${SERVER_TYPE} 서버의 특정 Flink 작업 중지 시작"
# 모든 작업 목록 가져오기
log_message "실행 중인 작업 목록 가져오기..."
JOBS_RESPONSE=$(curl -s -X GET "${JOBMANAGER_URL}/jobs" -H "Accept: application/json")
log_message "작업 목록 응답: $JOBS_RESPONSE"
# 실행 중인 작업 ID 추출
RUNNING_JOB_IDS=$(echo "$JOBS_RESPONSE" | grep -o '"id":"[^"]*","status":"RUNNING"' | grep -o '"id":"[^"]*"' | sed 's/"id":"//g' | sed 's/"//g')
if [ -z "$RUNNING_JOB_IDS" ]; then
log_message "실행 중인 작업을 찾을 수 없습니다."
else
log_message "처리할 작업 목록:"
log_message "$RUNNING_JOB_IDS"
for JOB_ID in $RUNNING_JOB_IDS; do
# 각 작업 상세 정보 조회
JOB_DETAIL_RESPONSE=$(curl -s -X GET "${JOBMANAGER_URL}/jobs/${JOB_ID}" -H "Accept: application/json")
JOB_NAME=$(echo "$JOB_DETAIL_RESPONSE" | grep -o '"name":"[^"]*"' | sed 's/"name":"//g' | sed 's/"//g')
log_message "확인 중인 작업: $JOB_NAME (ID: $JOB_ID)"
# stg1 관련 작업인지 확인
if [[ "$JOB_NAME" == *"stg1"* ]] || [[ "$JOB_NAME" == *"flink-job-stg1-eng"* ]] || [[ "$JOB_NAME" == *"flink-job-stg1-math"* ]] && [ "$SERVER_TYPE" = "stg1" ]; then
log_message "${SERVER_TYPE} 관련 작업 발견: $JOB_NAME (ID: $JOB_ID). 중지 시도..."
# 작업 중지 시도
log_message "방법 1: PATCH 요청"
STOP_RESPONSE=$(curl -s -X PATCH "${JOBMANAGER_URL}/jobs/${JOB_ID}" -H "Accept: application/json")
if [[ "$STOP_RESPONSE" == *"errors"* ]]; then
log_message "방법 1 실패, 방법 2: POST cancel"
STOP_RESPONSE=$(curl -s -X POST "${JOBMANAGER_URL}/jobs/${JOB_ID}/cancel" -H "Accept: application/json")
fi
if [[ "$STOP_RESPONSE" == *"errors"* ]]; then
log_message "방법 2 실패, 방법 3: POST stop"
STOP_RESPONSE=$(curl -s -X POST "${JOBMANAGER_URL}/jobs/${JOB_ID}/stop" -H "Accept: application/json")
fi
if [[ "$STOP_RESPONSE" == *"errors"* ]]; then
log_message "모든 중지 방법 실패. 수동 조치 필요"
else
log_message "작업 중지 성공: $JOB_NAME"
fi
sleep 5
# 작업 상태 확인
JOB_STATUS_RESPONSE=$(curl -s -X GET "${JOBMANAGER_URL}/jobs/${JOB_ID}" -H "Accept: application/json")
JOB_STATE=$(echo "$JOB_STATUS_RESPONSE" | grep -o '"state":"[^"]*"' | sed 's/"state":"//g' | sed 's/"//g')
if [ -z "$JOB_STATE" ]; then
log_message "작업 상태를 가져올 수 없음. 작업이 이미 종료되었을 수 있음"
else
log_message "작업 $JOB_ID 현재 상태: $JOB_STATE"
if [[ "$JOB_STATE" == "RUNNING" ]]; then
log_message "아직 RUNNING, 추가 강제 종료 시도..."
for endpoint in "cancel" "stop" "terminate"; do
log_message "엔드포인트 시도: $endpoint"
FORCE_STOP=$(curl -s -X POST "${JOBMANAGER_URL}/jobs/${JOB_ID}/${endpoint}" -H "Accept: application/json")
if [[ "$FORCE_STOP" != *"errors"* ]]; then
log_message "엔드포인트 $endpoint 성공"
break
fi
done
fi
fi
elif [[ "$JOB_NAME" == *"stg"* ]] || [[ "$JOB_NAME" == *"flink-job-stg-eng"* ]] || [[ "$JOB_NAME" == *"flink-job-stg-math"* ]] && [ "$SERVER_TYPE" = "stg" ]; then
log_message "${SERVER_TYPE} 관련 작업 발견: $JOB_NAME (ID: $JOB_ID). 중지 시도..."
# 작업 중지 시도
log_message "방법 1: PATCH 요청"
STOP_RESPONSE=$(curl -s -X PATCH "${JOBMANAGER_URL}/jobs/${JOB_ID}" -H "Accept: application/json")
if [[ "$STOP_RESPONSE" == *"errors"* ]]; then
log_message "방법 1 실패, 방법 2: POST cancel"
STOP_RESPONSE=$(curl -s -X POST "${JOBMANAGER_URL}/jobs/${JOB_ID}/cancel" -H "Accept: application/json")
fi
if [[ "$STOP_RESPONSE" == *"errors"* ]]; then
log_message "방법 2 실패, 방법 3: POST stop"
STOP_RESPONSE=$(curl -s -X POST "${JOBMANAGER_URL}/jobs/${JOB_ID}/stop" -H "Accept: application/json")
fi
if [[ "$STOP_RESPONSE" == *"errors"* ]]; then
log_message "모든 중지 방법 실패. 수동 조치 필요"
else
log_message "작업 중지 성공: $JOB_NAME"
fi
sleep 5
# 작업 상태 확인
JOB_STATUS_RESPONSE=$(curl -s -X GET "${JOBMANAGER_URL}/jobs/${JOB_ID}" -H "Accept: application/json")
JOB_STATE=$(echo "$JOB_STATUS_RESPONSE" | grep -o '"state":"[^"]*"' | sed 's/"state":"//g' | sed 's/"//g')
if [ -z "$JOB_STATE" ]; then
log_message "작업 상태를 가져올 수 없음. 작업이 이미 종료되었을 수 있음"
else
log_message "작업 $JOB_ID 현재 상태: $JOB_STATE"
if [[ "$JOB_STATE" == "RUNNING" ]]; then
log_message "아직 RUNNING, 추가 강제 종료 시도..."
for endpoint in "cancel" "stop" "terminate"; do
log_message "엔드포인트 시도: $endpoint"
FORCE_STOP=$(curl -s -X POST "${JOBMANAGER_URL}/jobs/${JOB_ID}/${endpoint}" -H "Accept: application/json")
if [[ "$FORCE_STOP" != *"errors"* ]]; then
log_message "엔드포인트 $endpoint 성공"
break
fi
done
fi
fi
else
log_message "${SERVER_TYPE} 관련 없는 작업 무시: $JOB_NAME"
fi
sleep 2
done
# 최종 상태 확인
log_message "모든 작업 중지 요청 후 최종 상태 확인"
sleep 5
FINAL_JOBS_RESPONSE=$(curl -s -X GET "${JOBMANAGER_URL}/jobs" -H "Accept: application/json")
FINAL_RUNNING_COUNT=$(echo "$FINAL_JOBS_RESPONSE" | grep -c '"status":"RUNNING"')
if [ "$FINAL_RUNNING_COUNT" -gt 0 ]; then
log_message "경고: 아직 실행 중인 작업이 $FINAL_RUNNING_COUNT개 존재합니다."
else
log_message "모든 ${SERVER_TYPE}관련 작업이 중지되었습니다."
fi
fi
fi
log_message "Flink 작업 중지 프로세스 완료"
}
# 모든 Flink 작업 중지 함수 (R서버, B2e서버용)
stop_all_flink_jobs() {
log_message "모든 Flink 작업 중지 시작"
# 모든 작업 목록 가져오기
log_message "실행 중인 작업 목록 가져오기..."
JOBS_RESPONSE=$(curl -s -X GET "${JOBMANAGER_URL}/jobs" -H "Accept: application/json")
log_message "작업 목록 응답: $JOBS_RESPONSE"
# 실행 중인 작업 ID 추출 시도 - 여러 가능한 형식 처리
RUNNING_JOB_IDS=""
# 방법 1: 상태가 'RUNNING'인 작업 찾기
RUNNING_JOB_IDS=$(echo $JOBS_RESPONSE | grep -o '"id":"[^"]*","status":"RUNNING"' | grep -o '"id":"[^"]*"' | sed 's/"id":"//g' | sed 's/"//g')
# 추출된 작업이 없으면 모든 작업 ID 추출 시도
if [ -z "$RUNNING_JOB_IDS" ]; then
log_message "실행 중인 작업을 찾을 수 없습니다. 모든 작업 ID 추출 시도..."
RUNNING_JOB_IDS=$(echo $JOBS_RESPONSE | grep -o '"id":"[^"]*"' | sed 's/"id":"//g' | sed 's/"//g')
fi
# 작업 ID가 없으면 메시지 출력
if [ -z "$RUNNING_JOB_IDS" ]; then
log_message "현재 실행 중인 Flink 작업이 없습니다."
else
log_message "처리할 작업 목록:"
log_message "$RUNNING_JOB_IDS"
# 각 작업 중지
for job_id in $RUNNING_JOB_IDS; do
log_message "작업 중지 시도 중: $job_id"
# 방법 1: CURL에서 -d 옵션 사용하지 않음 (내용 없는 요청)
log_message "방법 1: 내용 없는 PATCH 요청으로 시도"
STOP_RESPONSE=$(curl -s -X PATCH "${JOBMANAGER_URL}/jobs/${job_id}" -H "Accept: application/json")
# 응답이 에러를 포함하는지 확인
if [[ "$STOP_RESPONSE" == *"errors"* ]]; then
log_message "방법 1 실패, 방법 2 시도: POST 요청으로 작업 취소"
STOP_RESPONSE=$(curl -s -X POST "${JOBMANAGER_URL}/jobs/${job_id}/cancel" -H "Accept: application/json")
if [[ "$STOP_RESPONSE" == *"errors"* ]]; then
log_message "방법 2 실패, 방법 3 시도: 직접 CANCEL 상태 시도"
STOP_RESPONSE=$(curl -s -X POST "${JOBMANAGER_URL}/jobs/${job_id}/cancel" -H "Accept: application/json")
if [[ "$STOP_RESPONSE" == *"errors"* ]]; then
log_message "모든 API 방법 실패, 마지막 방법 시도: Savepoint 없이 중지"
STOP_RESPONSE=$(curl -s -X POST "${JOBMANAGER_URL}/jobs/${job_id}/stop" -H "Accept: application/json")
if [[ "$STOP_RESPONSE" == *"errors"* ]]; then
log_message "모든 중지 시도 실패. 작업이 이미 중지되었거나 다른 방법 필요함"
else
log_message "Savepoint 없는 중지 요청 성공"
fi
else
log_message "CANCEL 요청 성공"
fi
else
log_message "취소 요청 성공"
fi
else
log_message "작업 중지 요청 성공"
fi
# 작업 상태 확인 대기
log_message "작업 상태 확인 전 5초 대기..."
sleep 5
# 현재 작업 상태 확인
JOB_STATUS_RESPONSE=$(curl -s -X GET "${JOBMANAGER_URL}/jobs/${job_id}" -H "Accept: application/json")
JOB_STATE=$(echo $JOB_STATUS_RESPONSE | grep -o '"state":"[^"]*"' | sed 's/"state":"//g' | sed 's/"//g')
if [ -z "$JOB_STATE" ]; then
log_message "작업 상태를 가져올 수 없음. 작업이 이미 종료되었을 수 있음"
else
log_message "작업 $job_id 현재 상태: $JOB_STATE"
# RUNNING 상태라면 더욱 강력한 방법 시도
if [[ "$JOB_STATE" == "RUNNING" ]]; then
log_message "작업이 여전히 실행 중입니다. 명시적 취소 시도..."
# Flink 버전에 따라 엔드포인트가 다를 수 있음
for endpoint in "cancel" "stop" "terminate"; do
log_message "엔드포인트 시도: $endpoint"
FORCE_STOP=$(curl -s -X POST "${JOBMANAGER_URL}/jobs/${job_id}/$endpoint" -H "Accept: application/json")
if [[ "$FORCE_STOP" != *"errors"* ]]; then
log_message "엔드포인트 $endpoint 성공"
break
fi
done
fi
fi
# 다음 작업으로 넘어가기 전 잠시 대기
sleep 2
done
# 최종 상태 확인
log_message "모든 작업 중지 요청 후 최종 상태 확인"
sleep 5
FINAL_JOBS_RESPONSE=$(curl -s -X GET "${JOBMANAGER_URL}/jobs" -H "Accept: application/json")
log_message "최종 작업 목록 응답: $FINAL_JOBS_RESPONSE"
# 여전히 실행 중인 작업 확인
STILL_RUNNING=$(echo $FINAL_JOBS_RESPONSE | grep -o '"state":"RUNNING"' | wc -l)
if [ "$STILL_RUNNING" -gt 0 ]; then
log_message "경고: $STILL_RUNNING개의 작업이 여전히 실행 중입니다."
log_message "추가 문제 해결이 필요할 수 있습니다."
else
log_message "모든 작업이 더 이상 'RUNNING' 상태가 아님"
fi
fi
log_message "Flink 작업 중지 프로세스 완료"
}
stop_all_flink_jobs_math() {
log_message "모든 Flink 작업 중지 시작"
# 모든 작업 목록 가져오기
log_message "실행 중인 작업 목록 가져오기..."
JOBS_RESPONSE=$(curl -s -X GET "${MATH_JOBMANAGER_URL}/jobs" -H "Accept: application/json")
log_message "작업 목록 응답: $JOBS_RESPONSE"
# 실행 중인 작업 ID 추출 시도 - 여러 가능한 형식 처리
RUNNING_JOB_IDS=""
# 방법 1: 상태가 'RUNNING'인 작업 찾기
RUNNING_JOB_IDS=$(echo $JOBS_RESPONSE | grep -o '"id":"[^"]*","status":"RUNNING"' | grep -o '"id":"[^"]*"' | sed 's/"id":"//g' | sed 's/"//g')
# 추출된 작업이 없으면 모든 작업 ID 추출 시도
if [ -z "$RUNNING_JOB_IDS" ]; then
log_message "실행 중인 작업을 찾을 수 없습니다. 모든 작업 ID 추출 시도..."
RUNNING_JOB_IDS=$(echo $JOBS_RESPONSE | grep -o '"id":"[^"]*"' | sed 's/"id":"//g' | sed 's/"//g')
fi
# 작업 ID가 없으면 메시지 출력
if [ -z "$RUNNING_JOB_IDS" ]; then
log_message "현재 실행 중인 Flink 작업이 없습니다."
else
log_message "처리할 작업 목록:"
log_message "$RUNNING_JOB_IDS"
# 각 작업 중지
for job_id in $RUNNING_JOB_IDS; do
log_message "작업 중지 시도 중: $job_id"
# 방법 1: CURL에서 -d 옵션 사용하지 않음 (내용 없는 요청)
log_message "방법 1: 내용 없는 PATCH 요청으로 시도"
STOP_RESPONSE=$(curl -s -X PATCH "${MATH_JOBMANAGER_URL}/jobs/${job_id}" -H "Accept: application/json")
# 응답이 에러를 포함하는지 확인
if [[ "$STOP_RESPONSE" == *"errors"* ]]; then
log_message "방법 1 실패, 방법 2 시도: POST 요청으로 작업 취소"
STOP_RESPONSE=$(curl -s -X POST "${MATH_JOBMANAGER_URL}/jobs/${job_id}/cancel" -H "Accept: application/json")
if [[ "$STOP_RESPONSE" == *"errors"* ]]; then
log_message "방법 2 실패, 방법 3 시도: 직접 CANCEL 상태 시도"
STOP_RESPONSE=$(curl -s -X POST "${MATH_JOBMANAGER_URL}/jobs/${job_id}/cancel" -H "Accept: application/json")
if [[ "$STOP_RESPONSE" == *"errors"* ]]; then
log_message "모든 API 방법 실패, 마지막 방법 시도: Savepoint 없이 중지"
STOP_RESPONSE=$(curl -s -X POST "${MATH_JOBMANAGER_URL}/jobs/${job_id}/stop" -H "Accept: application/json")
if [[ "$STOP_RESPONSE" == *"errors"* ]]; then
log_message "모든 중지 시도 실패. 작업이 이미 중지되었거나 다른 방법 필요함"
else
log_message "Savepoint 없는 중지 요청 성공"
fi
else
log_message "CANCEL 요청 성공"
fi
else
log_message "취소 요청 성공"
fi
else
log_message "작업 중지 요청 성공"
fi
# 작업 상태 확인 대기
log_message "작업 상태 확인 전 5초 대기..."
sleep 5
# 현재 작업 상태 확인
JOB_STATUS_RESPONSE=$(curl -s -X GET "${MATH_JOBMANAGER_URL}/jobs/${job_id}" -H "Accept: application/json")
JOB_STATE=$(echo $JOB_STATUS_RESPONSE | grep -o '"state":"[^"]*"' | sed 's/"state":"//g' | sed 's/"//g')
if [ -z "$JOB_STATE" ]; then
log_message "작업 상태를 가져올 수 없음. 작업이 이미 종료되었을 수 있음"
else
log_message "작업 $job_id 현재 상태: $JOB_STATE"
# RUNNING 상태라면 더욱 강력한 방법 시도
if [[ "$JOB_STATE" == "RUNNING" ]]; then
log_message "작업이 여전히 실행 중입니다. 명시적 취소 시도..."
# Flink 버전에 따라 엔드포인트가 다를 수 있음
for endpoint in "cancel" "stop" "terminate"; do
log_message "엔드포인트 시도: $endpoint"
FORCE_STOP=$(curl -s -X POST "${MATH_JOBMANAGER_URL}/jobs/${job_id}/$endpoint" -H "Accept: application/json")
if [[ "$FORCE_STOP" != *"errors"* ]]; then
log_message "엔드포인트 $endpoint 성공"
break
fi
done
fi
fi
# 다음 작업으로 넘어가기 전 잠시 대기
sleep 2
done
# 최종 상태 확인
log_message "모든 작업 중지 요청 후 최종 상태 확인"
sleep 5
FINAL_JOBS_RESPONSE=$(curl -s -X GET "${MATH_JOBMANAGER_URL}/jobs" -H "Accept: application/json")
log_message "최종 작업 목록 응답: $FINAL_JOBS_RESPONSE"
# 여전히 실행 중인 작업 확인
STILL_RUNNING=$(echo $FINAL_JOBS_RESPONSE | grep -o '"state":"RUNNING"' | wc -l)
if [ "$STILL_RUNNING" -gt 0 ]; then
log_message "경고: $STILL_RUNNING개의 작업이 여전히 실행 중입니다."
log_message "추가 문제 해결이 필요할 수 있습니다."
else
log_message "모든 작업이 더 이상 'RUNNING' 상태가 아님"
fi
fi
log_message "Flink 작업 중지 프로세스 완료"
}
delete_connectors() {
if [ "$SERVER_TYPE" = "release" ] || [ "$SERVER_TYPE" = "beta-2e" ]; then
# R 서버용 - ENG 커넥터 삭제
log_message "${SERVER_TYPE}ENG 환경 커넥터 삭제 시작"
# 기존 커넥터 목록 가져오기
log_message "${SERVER_TYPE}ENG 환경 기존 커넥터 목록 확인 중..."
# ENG_EXISTING_CONNECTORS=$(curl -s "${ENG_KAFKA_CONNECTOR}")
log_message "기존 커넥터 목록: ${ENG_EXISTING_CONNECTORS}"
/var/lib/jenkins/mysql-connector/${SERVER_TYPE}/engl/reset.sh
# 기존 커넥터 삭제
# for connector in $(echo $ENG_EXISTING_CONNECTORS | grep -o '"[^"]*"' | sed 's/"//g'); do
# if [[ "$connector" != "[" && "$connector" != "]" ]]; then
# log_message "커넥터 삭제 중: $connector"
# DELETE_RESPONSE=$(curl -s -X DELETE "${ENG_KAFKA_CONNECTOR}/${connector}")
# log_message "삭제 응답: $DELETE_RESPONSE"
# # 각 요청 사이에 잠시 대기
# sleep 2
# fi
# done
log_message "${SERVER_TYPE}ENG 환경 커넥터 삭제 완료"
# R 서버용 - MATH 커넥터 삭제
log_message "${SERVER_TYPE}MATH 환경 커넥터 삭제 시작"
# 기존 커넥터 목록 가져오기
log_message "${SERVER_TYPE}MATH 환경 기존 커넥터 목록 확인 중..."
# MATH_EXISTING_CONNECTORS=$(curl -s "${MATH_KAFKA_CONNECTOR}")
log_message "기존 커넥터 목록: ${MATH_EXISTING_CONNECTORS}"
/var/lib/jenkins/mysql-connector/${SERVER_TYPE}/math/reset.sh
# 기존 커넥터 삭제
# for connector in $(echo $MATH_EXISTING_CONNECTORS | grep -o '"[^"]*"' | sed 's/"//g'); do
# if [[ "$connector" != "[" && "$connector" != "]" ]]; then
# log_message "커넥터 삭제 중: $connector"
# DELETE_RESPONSE=$(curl -s -X DELETE "${MATH_KAFKA_CONNECTOR}/${connector}")
# log_message "삭제 응답: $DELETE_RESPONSE"
# # 각 요청 사이에 잠시 대기
# sleep 2
# fi
# done
# log_message "${SERVER_TYPE}MATH 환경 커넥터 삭제 완료"
else
# S1 서버용 - 단일 커넥터 삭제
log_message "${SERVER_TYPE} 환경 커넥터 삭제 시작"
# 기존 커넥터 목록 가져오기
log_message "${SERVER_TYPE} 환경 기존 커넥터 목록 확인 중..."
# EXISTING_CONNECTORS=$(curl -s "${COMMON_KAFKA_CONNECTOR}")
log_message "기존 커넥터 목록: ${EXISTING_CONNECTORS}"
/var/lib/jenkins/mysql-connector/${SERVER_TYPE}/reset.sh
# 기존 커넥터 삭제
# for connector in $(echo $EXISTING_CONNECTORS | grep -o '"[^"]*"' | sed 's/"//g'); do
# if [[ "$connector" != "[" && "$connector" != "]" ]]; then
# log_message "커넥터 삭제 중: $connector"
# DELETE_RESPONSE=$(curl -s -X DELETE "${COMMON_KAFKA_CONNECTOR}/${connector}")
# log_message "삭제 응답: $DELETE_RESPONSE"
# # 각 요청 사이에 잠시 대기
# sleep 2
# fi
# done
log_message "S1 환경 커넥터 삭제 완료"
fi
}
check_flags() {
local args=$1
local dir="/var/lib/jenkins/mysql-connector"
local LMS_FLAG="${dir}/${args}/lmsflag"
local LCMS_FLAG="${dir}/${args}/lcmsflag"
while true; do
if [[ -f "$LMS_FLAG" && -f "$LCMS_FLAG" ]]; then
VALUE1=$(tr -d '[:space:]' < "$LMS_FLAG")
VALUE2=$(tr -d '[:space:]' < "$LCMS_FLAG")
echo "LMS_FLAG: $VALUE1, LCMS_FLAG: $VALUE2"
if [[ "$VALUE1" == "true" && "$VALUE2" == "true" ]]; then
echo "두 플래그가 모두 'true'입니다. 종료합니다."
break
fi
else
echo "파일이 존재하지 않습니다: $([[ ! -f "$LMS_FLAG" ]] && echo LMS_FLAG) $([[ ! -f "$LCMS_FLAG" ]] && echo LCMS_FLAG)"
fi
log_message "$args 토픽 생성 대기 중입니다..."
sleep 10
done
}
basic_auth_request() {
local URL="$1"
local AUTH="${JENKINS_USER}:${JENKINS_TOKEN}"
local ENCODED=$(echo -n "$AUTH" | base64)
log_message "$URL"
log_message "$ENCODED"
curl -X GET "$URL" -H "Authorization: Basic $ENCODED"
}
# 커넥터 생성 함수 (서버 타입에 따라 다르게 동작)
create_connectors() {
if [ "$SERVER_TYPE" = "release" ] || [ "$SERVER_TYPE" = "beta-2e" ]; then
# R 서버용 - ENG 커넥터 생성
log_message "ENG 환경 커넥터 생성 시작"
# 디렉토리 확인
if [ ! -d "$ENG_KAFKA_DIR" ]; then
log_message "ENG 환경 디렉토리가 존재하지 않습니다. 디렉토리를 생성합니다."
mkdir -p "$ENG_KAFKA_DIR"
fi
# JSON 파일 확인
ENG_FILES=$(find "$ENG_KAFKA_DIR" -name "*.json" 2>/dev/null)
if [ -z "$ENG_FILES" ]; then
log_message "ENG 환경 설정 파일이 없습니다."
else
# 각 JSON 파일로 커넥터 생성
for connector_file in $ENG_KAFKA_DIR/*.json; do
if [ -f "$connector_file" ]; then
CONNECTOR_NAME=$(extract_connector_name "$connector_file")
log_message "ENG 환경 Kafka 커넥터 설정 중: $CONNECTOR_NAME (파일: $(basename $connector_file))"
# 커넥터 설정
RESPONSE=$(curl -s -w "\n%{http_code}" -X POST "$ENG_KAFKA_CONNECTOR" -H "Content-Type: application/json" -d @"$connector_file")
HTTP_BODY=$(echo "$RESPONSE" | sed '$d')
HTTP_CODE=$(echo "$RESPONSE" | tail -n1)
log_message "응답: $HTTP_BODY"
if [ "$HTTP_CODE" -ne 201 ]; then
echo "Error - $HTTP_CODE: 영어 컨넥터 생성에 실패하였습니다."
exit 1
fi
# 각 요청 사이에 잠시 대기
sleep 2
fi
done
fi
log_message "ENG 환경 커넥터 생성 완료"
check_flags "$SERVER_TYPE/engl"
# R 서버용 - MATH 커넥터 생성
log_message "MATH 환경 커넥터 생성 시작"
# 디렉토리 확인
if [ ! -d "$MATH_KAFKA_DIR" ]; then
log_message "MATH 환경 디렉토리가 존재하지 않습니다. 디렉토리를 생성합니다."
mkdir -p "$MATH_KAFKA_DIR"
fi
# JSON 파일 확인
MATH_FILES=$(find "$MATH_KAFKA_DIR" -name "*.json" 2>/dev/null)
if [ -z "$MATH_FILES" ]; then
log_message "MATH 환경 설정 파일이 없습니다."
else
# 각 JSON 파일로 커넥터 생성
for connector_file in $MATH_KAFKA_DIR/*.json; do
if [ -f "$connector_file" ]; then
CONNECTOR_NAME=$(extract_connector_name "$connector_file")
log_message "MATH 환경 Kafka 커넥터 설정 중: $CONNECTOR_NAME (파일: $(basename $connector_file))"
# 커넥터 설정
RESPONSE=$(curl -s -w "\n%{http_code}" -X POST $MATH_KAFKA_CONNECTOR -H "Content-Type: application/json" -d @"$connector_file")
HTTP_BODY=$(echo "$RESPONSE" | sed '$d')
HTTP_CODE=$(echo "$RESPONSE" | tail -n1)
log_message "응답: $HTTP_BODY"
if [ "$HTTP_CODE" -ne 201 ]; then
echo "Error - $HTTP_CODE: 수학 컨넥터 생성에 실패하였습니다."
exit 1
fi
# 각 요청 사이에 잠시 대기
sleep 2
fi
done
fi
log_message "MATH 환경 커넥터 생성 완료"
check_flags "$SERVER_TYPE/math"
else
# S1 서버용 - 단일 커넥터 생성
log_message "${SERVER_TYPE} 환경 커넥터 생성 시작"
# 디렉토리 확인
if [ ! -d "$COMMON_KAFKA_DIR" ]; then
log_message "${SERVER_TYPE} 환경 디렉토리가 존재하지 않습니다. 디렉토리를 생성합니다."
mkdir -p "$COMMON_KAFKA_DIR"
fi
# JSON 파일 확인
S1_FILES=$(find "$COMMON_KAFKA_DIR" -name "*.json" 2>/dev/null)
if [ -z "$S1_FILES" ]; then
log_message "${SERVER_TYPE} 환경 설정 파일이 없습니다."
else
# 각 JSON 파일로 커넥터 생성
for connector_file in $COMMON_KAFKA_DIR/*.json; do
if [ -f "$connector_file" ]; then
CONNECTOR_NAME=$(extract_connector_name "$connector_file")
log_message "${SERVER_TYPE} 환경 Kafka 커넥터 설정 중: $CONNECTOR_NAME (파일: $(basename $connector_file))"
# 커넥터 설정
RESPONSE=$(curl -s -w "\n%{http_code}" -X POST $COMMON_KAFKA_CONNECTOR -H "Content-Type: application/json" -d @"$connector_file")
HTTP_BODY=$(echo "$RESPONSE" | sed '$d')
HTTP_CODE=$(echo "$RESPONSE" | tail -n1)
log_message "응답: $HTTP_BODY"
if [ "$HTTP_CODE" -ne 201 ]; then
echo "Error - $HTTP_CODE: 컨넥터 생성에 실패하였습니다."
exit 1
fi
# 각 요청 사이에 잠시 대기
sleep 2
fi
done
fi
log_message "${SERVER_TYPE} 환경 커넥터 생성 완료"
check_flags $SERVER_TYPE
fi
}
# Flink JAR 업로드 및 실행 함수
run_flink_jars() {
log_message "Flink JAR 업로드 및 실행 시작"
# 현재 디렉토리 저장
CURRENT_DIR=$(pwd)
log_message "JAR 파일 빌드 시작..."
cd "$PROJECT_ROOT"
log_message "빌드 디렉토리: $(pwd)"
# Java 버전 확인
JAVA_VERSION=$(java -version 2>&1 | head -1)
log_message "시스템 Java 버전: $JAVA_VERSION"
# Gradle 데몬 중지 후 빌드 실행
./gradlew --stop
log_message "Gradle 데몬 중지됨. 빌드 시작..."
# 빌드 실행 - 에러 출력 캡처
BUILD_OUTPUT=$(./gradlew shadowJar 2>&1)
BUILD_STATUS=$?
if [ $BUILD_STATUS -eq 0 ]; then
log_message "JAR 파일 빌드 성공"
log_message "빌드된 JAR 파일은 ${JAR_DIR} 디렉토리에 있습니다"
else
log_message "JAR 파일 빌드 실패. 오류:"
log_message "$BUILD_OUTPUT"
log_message "기존 JAR 파일로 진행합니다(있는 경우)."
fi
# 원래 디렉토리로 복귀
cd "$CURRENT_DIR"
# JAR 디렉토리 존재 확인
if [ ! -d "$JAR_DIR" ]; then
log_message "JAR 디렉토리가 존재하지 않습니다. 디렉토리를 생성합니다."
mkdir -p "$JAR_DIR"
fi
# JAR 파일 목록 확인
JAR_FILES=$(find "$JAR_DIR" -name "*.jar" 2>/dev/null)
if [ -z "$JAR_FILES" ]; then
log_message "JAR 파일을 찾을 수 없습니다."
log_message "JAR 파일 실행 단계 건너뜀"
return
fi
log_message "찾은 JAR 파일들:"
ls -la $JAR_DIR/*.jar | while read line; do
log_message "$line"
done
# B2e 서버의 경우 JobManager를 두 개 사용하므로 처리해야 함
if [ "$SERVER_TYPE" = "beta-2e" ]; then
# BENG JobManager에 먼저 각 JAR 파일 업로드 및 실행
for jar_file in $JAR_DIR/*.jar; do
if [ -f "$jar_file" ]; then
log_message "BENG JobManager로 전송 중인 파일: $(basename $jar_file)"
# ENG JobManager에 JAR 파일 업로드
UPLOAD_RESPONSE=$(curl -s -X POST -H "Expect:" -F "jarfile=@${jar_file}" "${JOBMANAGER_URL}/jars/upload")
# 업로드된 JAR의 ID 추출
JAR_ID=$(echo $UPLOAD_RESPONSE | grep -o '"filename":"[^"]*"' | sed 's/"filename":"//g' | sed 's/"//g')
# 전체 JAR 경로에서 ID 부분만 추출 (필요한 경우)
if [[ $JAR_ID == *"/"* ]]; then
JAR_ID=$(basename "$JAR_ID")
fi
if [ -z "$JAR_ID" ]; then
# 다른 방식으로 JAR ID 추출 시도
JAR_ID=$(echo $UPLOAD_RESPONSE | grep -o '"id":"[^"]*"' | head -1 | sed 's/"id":"//g' | sed 's/"//g')
if [ -z "$JAR_ID" ]; then
log_message "JAR 파일 업로드 실패 또는 ID 추출 실패: $(basename $jar_file)"
continue
fi
fi
log_message "BENG JobManager 업로드 성공, JAR ID: $JAR_ID"
# ENG 환경 옵션으로 JAR 실행
log_message "BENG JAR 실행 중: 옵션 --env BENG --type eng"
# JAR 실행 API 호출 - 옵션을 JSON 형식으로 전달
RUN_RESPONSE=$(curl -s -X POST \
"${JOBMANAGER_URL}/jars/${JAR_ID}/run" \
-H "Content-Type: application/json" \
-d "{\"programArgs\": \"--env BENG --type eng\"}")
# 실행 결과 출력
JOB_ID=$(echo $RUN_RESPONSE | grep -o '"jobid":"[^"]*"' | sed 's/"jobid":"//g' | sed 's/"//g')
if [ -z "$JOB_ID" ]; then
log_message "BENG JAR 실행 실패"
# 에러 메시지 확인
ERROR_MSG=$(echo $RUN_RESPONSE | grep -o '"errors":\[[^]]*\]' | sed 's/"errors":\[//g' | sed 's/\]//g')
if [ ! -z "$ERROR_MSG" ]; then
log_message "에러 메시지: $ERROR_MSG"
fi
else
log_message "ENG JAR 실행 성공: Job ID $JOB_ID"
basic_auth_request "https://jenkins.example.com/job/cloud/job/0-cdc-snapshot-finish-trigger-reset/buildWithParameters?token=snapshot&SERVER_TYPE=${SERVER_TYPE}"
fi
# 각 작업 간 대기 시간
sleep 2
fi
done
# BMATH JobManager에 각 JAR 파일 업로드 및 실행
for jar_file in $JAR_DIR/*.jar; do
if [ -f "$jar_file" ]; then
log_message "BMATH JobManager로 전송 중인 파일: $(basename $jar_file)"
# MATH JobManager에 JAR 파일 업로드
UPLOAD_RESPONSE=$(curl -s -X POST -H "Expect:" -F "jarfile=@${jar_file}" "${MATH_JOBMANAGER_URL}/jars/upload")
# 업로드된 JAR의 ID 추출
JAR_ID=$(echo $UPLOAD_RESPONSE | grep -o '"filename":"[^"]*"' | sed 's/"filename":"//g' | sed 's/"//g')
# 전체 JAR 경로에서 ID 부분만 추출 (필요한 경우)
if [[ $JAR_ID == *"/"* ]]; then
JAR_ID=$(basename "$JAR_ID")
fi
if [ -z "$JAR_ID" ]; then
# 다른 방식으로 JAR ID 추출 시도
JAR_ID=$(echo $UPLOAD_RESPONSE | grep -o '"id":"[^"]*"' | head -1 | sed 's/"id":"//g' | sed 's/"//g')
if [ -z "$JAR_ID" ]; then
log_message "JAR 파일 업로드 실패 또는 ID 추출 실패: $(basename $jar_file)"
continue
fi
fi
log_message "BMATH JobManager 업로드 성공, JAR ID: $JAR_ID"
# MATH 환경 옵션으로 JAR 실행
log_message "BMATH JAR 실행 중: 옵션 --env BMATH --type math"
# JAR 실행 API 호출 - 옵션을 JSON 형식으로 전달
RUN_RESPONSE=$(curl -s -X POST \
"${MATH_JOBMANAGER_URL}/jars/${JAR_ID}/run" \
-H "Content-Type: application/json" \
-d "{\"programArgs\": \"--env BMATH --type math\"}")
# 실행 결과 출력
JOB_ID=$(echo $RUN_RESPONSE | grep -o '"jobid":"[^"]*"' | sed 's/"jobid":"//g' | sed 's/"//g')
if [ -z "$JOB_ID" ]; then
log_message "BMATH JAR 실행 실패"
# 에러 메시지 확인
ERROR_MSG=$(echo $RUN_RESPONSE | grep -o '"errors":\[[^]]*\]' | sed 's/"errors":\[//g' | sed 's/\]//g')
if [ ! -z "$ERROR_MSG" ]; then
log_message "에러 메시지: $ERROR_MSG"
fi
else
log_message "MATH JAR 실행 성공: Job ID $JOB_ID"
basic_auth_request "https://jenkins.example.com/job/cloud/job/0-cdc-snapshot-finish-trigger-reset/buildWithParameters?token=snapshot&SERVER_TYPE=${SERVER_TYPE}"
fi
# 각 작업 간 대기 시간
sleep 2
fi
done
else
# R 서버와 S1 서버의 경우 기존 로직 사용
# 각 JAR 파일에 대해 순차적으로 처리
for jar_file in $JAR_DIR/*.jar; do
if [ -f "$jar_file" ]; then
log_message "전송 중인 파일: $(basename $jar_file)"
# JAR 파일 업로드
UPLOAD_RESPONSE=$(curl -s -X POST -H "Expect:" -F "jarfile=@${jar_file}" "${JOBMANAGER_URL}/jars/upload")
# 업로드된 JAR의 ID 추출
JAR_ID=$(echo $UPLOAD_RESPONSE | grep -o '"filename":"[^"]*"' | sed 's/"filename":"//g' | sed 's/"//g')
# 전체 JAR 경로에서 ID 부분만 추출 (필요한 경우)
if [[ $JAR_ID == *"/"* ]]; then
JAR_ID=$(basename "$JAR_ID")
fi
if [ -z "$JAR_ID" ]; then
# 다른 방식으로 JAR ID 추출 시도
JAR_ID=$(echo $UPLOAD_RESPONSE | grep -o '"id":"[^"]*"' | head -1 | sed 's/"id":"//g' | sed 's/"//g')
if [ -z "$JAR_ID" ]; then
log_message "JAR 파일 업로드 실패 또는 ID 추출 실패: $(basename $jar_file)"
continue
fi
fi
log_message "업로드 성공, JAR ID: $JAR_ID"
# 각 환경 옵션으로 JAR 실행
for options in "${ENV_OPTIONS[@]}"; do
log_message "JAR 실행 중: 옵션 $options"
# JAR 실행 API 호출 - 옵션을 JSON 형식으로 전달
RUN_RESPONSE=$(curl -s -X POST \
"${JOBMANAGER_URL}/jars/${JAR_ID}/run" \
-H "Content-Type: application/json" \
-d "{\"programArgs\": \"$options\"}")
# 실행 결과 출력
JOB_ID=$(echo $RUN_RESPONSE | grep -o '"jobid":"[^"]*"' | sed 's/"jobid":"//g' | sed 's/"//g')
if [ -z "$JOB_ID" ]; then
log_message "JAR 실행 실패: 옵션 $options"
# 에러 메시지 확인
ERROR_MSG=$(echo $RUN_RESPONSE | grep -o '"errors":\[[^]]*\]' | sed 's/"errors":\[//g' | sed 's/\]//g')
if [ ! -z "$ERROR_MSG" ]; then
log_message "에러 메시지: $ERROR_MSG"
fi
else
log_message "JAR 실행 성공: Job ID $JOB_ID (옵션: $options)"
basic_auth_request "https://jenkins.example.com/job/cloud/job/0-cdc-snapshot-finish-trigger-reset/buildWithParameters?token=snapshot&SERVER_TYPE=${SERVER_TYPE}"
fi
# 각 작업 간 대기 시간 (필요시 조정)
sleep 2
done
fi
done
fi
log_message "Flink JAR 업로드 및 실행 완료"
}
# 모든 서비스 중지
stop_all_services() {
log_message "====================================================="
log_message "모든 서비스 중지 프로세스 시작 (${SERVER_TYPE} 서버)"
# 1. 서버 타입에 따라 Flink 작업 중지
stop_specific_flink_jobs
# 2. 모든 커넥터 삭제 (서버 타입에 따라 다르게 동작)
delete_connectors
log_message "모든 서비스 중지 프로세스 완료"
log_message "====================================================="
}
# 모든 서비스 시작
start_all_services() {
log_message "====================================================="
log_message "모든 서비스 시작 프로세스 시작 (${SERVER_TYPE} 서버)"
# 1. 커넥터 생성 (서버 타입에 따라 다르게 동작)
create_connectors
# 2. 토픽 생성을 위한 대기
# log_message "커넥터가 설정되었습니다. 토픽 생성을 위해 ${TOPIC_WAIT_TIME}초(30분) 대기합니다."
# log_message "대기 시작 시간: $(date '+%Y-%m-%d %H:%M:%S')"
# # 1분 간격으로 남은 시간 표시
# for ((i=TOPIC_WAIT_TIME; i>0; i-=60)); do
# if [ $i -ge 60 ]; then
# log_message "남은 대기 시간: $((i/60))분"
# sleep 60
# else
# log_message "남은 대기 시간: ${i}초"
# sleep $i
# break
# fi
# done
# log_message "대기 완료 시간: $(date '+%Y-%m-%d %H:%M:%S')"
# 3. Flink JAR 업로드 및 실행
run_flink_jars
log_message "모든 서비스 시작 프로세스 완료"
log_message "====================================================="
}
# 명령행 인자 처리
case "$1" in
stop)
log_message "수동 실행: 모든 서비스 중지"
stop_all_services
;;
start)
log_message "수동 실행: 모든 서비스 시작"
start_all_services
;;
stop-jobs)
log_message "수동 실행: Flink 작업만 중지"
stop_specific_flink_jobs
;;
stop-kafka)
if [ "$SERVER_TYPE" = "release" ] || [ "$SERVER_TYPE" = "beta-2e" ]; then
log_message "수동 실행: ENG 커넥터 중지"
# R 서버의 ENG 커넥터 삭제
log_message "RENG 환경 커넥터 삭제 시작"
# 기존 커넥터 목록 가져오기
log_message "RENG 환경 기존 커넥터 목록 확인 중..."
ENG_EXISTING_CONNECTORS=$(curl -s "${ENG_KAFKA_CONNECTOR}")
log_message "기존 커넥터 목록: ${ENG_EXISTING_CONNECTORS}"
# 기존 커넥터 삭제
for connector in $(echo $ENG_EXISTING_CONNECTORS | grep -o '"[^"]*"' | sed 's/"//g'); do
if [[ "$connector" != "[" && "$connector" != "]" ]]; then
log_message "커넥터 삭제 중: $connector"
DELETE_RESPONSE=$(curl -s -X DELETE "${ENG_KAFKA_CONNECTOR}/${connector}")
log_message "삭제 응답: $DELETE_RESPONSE"
# 각 요청 사이에 잠시 대기
sleep 2
fi
done
log_message "RENG 환경 커넥터 삭제 완료"
log_message "수동 실행: MATH 커넥터 중지"
# R 서버의 MATH 커넥터 삭제
log_message "MATH 환경 커넥터 삭제 시작"
# 기존 커넥터 목록 가져오기
log_message "MATH 환경 기존 커넥터 목록 확인 중..."
MATH_EXISTING_CONNECTORS=$(curl -s "${MATH_KAFKA_CONNECTOR}")
log_message "기존 커넥터 목록: ${MATH_EXISTING_CONNECTORS}"
# 기존 커넥터 삭제
for connector in $(echo $MATH_EXISTING_CONNECTORS | grep -o '"[^"]*"' | sed 's/"//g'); do
if [[ "$connector" != "[" && "$connector" != "]" ]]; then
log_message "커넥터 삭제 중: $connector"
DELETE_RESPONSE=$(curl -s -X DELETE "${MATH_KAFKA_CONNECTOR}/${connector}")
log_message "삭제 응답: $DELETE_RESPONSE"
# 각 요청 사이에 잠시 대기
sleep 2
fi
done
log_message "RMATH 환경 커넥터 삭제 완료"
else
log_message "수동 실행: ${SERVER_TYPE} 커넥터 중지"
# S1 서버의 커넥터 삭제
log_message "${SERVER_TYPE} 환경 커넥터 삭제 시작"
# 기존 커넥터 목록 가져오기
log_message "${SERVER_TYPE} 환경 기존 커넥터 목록 확인 중..."
EXISTING_CONNECTORS=$(curl -s "${COMMON_KAFKA_CONNECTOR}")
log_message "기존 커넥터 목록: ${EXISTING_CONNECTORS}"
# 기존 커넥터 삭제
for connector in $(echo $EXISTING_CONNECTORS | grep -o '"[^"]*"' | sed 's/"//g'); do
if [[ "$connector" != "[" && "$connector" != "]" ]]; then
log_message "커넥터 삭제 중: $connector"
DELETE_RESPONSE=$(curl -s -X DELETE "${COMMON_KAFKA_CONNECTOR}/${connector}")
log_message "삭제 응답: $DELETE_RESPONSE"
# 각 요청 사이에 잠시 대기
sleep 2
fi
done
log_message "${SERVER_TYPE} 환경 커넥터 삭제 완료"
fi
;;
start-kafka)
if [ "$SERVER_TYPE" = "release" ] || [ "$SERVER_TYPE" = "beta-2e" ]; then
log_message "수동 실행: RENG 커넥터 시작"
# R 서버의 ENG 커넥터 생성
log_message "ENG 환경 커넥터 생성 시작"
# 디렉토리 확인
if [ ! -d "$ENG_KAFKA_DIR" ]; then
log_message "ENG 환경 디렉토리가 존재하지 않습니다. 디렉토리를 생성합니다."
mkdir -p "$ENG_KAFKA_DIR"
fi
# JSON 파일 확인
ENG_FILES=$(find "$ENG_KAFKA_DIR" -name "*.json" 2>/dev/null)
if [ -z "$ENG_FILES" ]; then
log_message "ENG 환경 설정 파일이 없습니다."
else
# 각 JSON 파일로 커넥터 생성
for connector_file in $ENG_KAFKA_DIR/*.json; do
if [ -f "$connector_file" ]; then
CONNECTOR_NAME=$(extract_connector_name "$connector_file")
log_message "ENG 환경 Kafka 커넥터 설정 중: $CONNECTOR_NAME (파일: $(basename $connector_file))"
# 커넥터 설정
RESPONSE=$(curl -s -w "\n%{http_code}" -X POST $ENG_KAFKA_CONNECTOR -H "Content-Type: application/json" -d @"$connector_file")
HTTP_BODY=$(echo "$RESPONSE" | sed '$d')
HTTP_CODE=$(echo "$RESPONSE" | tail -n1)
log_message "응답: $HTTP_BODY"
if [ "$HTTP_CODE" -ne 201 ]; then
echo "Error - $HTTP_CODE: 영어 컨넥터 생성에 실패하였습니다."
exit 1
fi
# 각 요청 사이에 잠시 대기
sleep 2
fi
done
fi
log_message "ENG 환경 커넥터 생성 완료"
######################################### 수학 ###########################################
log_message "수동 실행: MATH 커넥터 시작"
# R 서버의 MATH 커넥터 생성
log_message "MATH 환경 커넥터 생성 시작"
# 디렉토리 확인
if [ ! -d "$MATH_KAFKA_DIR" ]; then
log_message "MATH 환경 디렉토리가 존재하지 않습니다. 디렉토리를 생성합니다."
mkdir -p "$MATH_KAFKA_DIR"
fi
# JSON 파일 확인
MATH_FILES=$(find "$MATH_KAFKA_DIR" -name "*.json" 2>/dev/null)
if [ -z "$MATH_FILES" ]; then
log_message "MATH 환경 설정 파일이 없습니다."
else
# 각 JSON 파일로 커넥터 생성
for connector_file in $MATH_KAFKA_DIR/*.json; do
if [ -f "$connector_file" ]; then
CONNECTOR_NAME=$(extract_connector_name "$connector_file")
log_message "MATH 환경 Kafka 커넥터 설정 중: $CONNECTOR_NAME (파일: $(basename $connector_file))"
# 커넥터 설정
RESPONSE=$(curl -s -w "\n%{http_code}" -X POST $MATH_KAFKA_CONNECTOR -H "Content-Type: application/json" -d @"$connector_file")
HTTP_BODY=$(echo "$RESPONSE" | sed '$d')
HTTP_CODE=$(echo "$RESPONSE" | tail -n1)
log_message "응답: $HTTP_BODY"
if [ "$HTTP_CODE" -ne 201 ]; then
echo "Error - $HTTP_CODE: 수학 컨넥터 생성에 실패하였습니다."
exit 1
fi
# 각 요청 사이에 잠시 대기
sleep 2
fi
done
fi
log_message "MATH 환경 커넥터 생성 완료"
else
log_message "수동 실행: ${SERVER_TYPE} 커넥터 시작"
# S1 서버의 커넥터 생성
log_message "${SERVER_TYPE} 환경 커넥터 생성 시작"
# 디렉토리 확인
if [ ! -d "$COMMON_KAFKA_DIR" ]; then
log_message "${SERVER_TYPE} 환경 디렉토리가 존재하지 않습니다. 디렉토리를 생성합니다."
mkdir -p "$COMMON_KAFKA_DIR"
fi
# JSON 파일 확인
S1_FILES=$(find "$COMMON_KAFKA_DIR" -name "*.json" 2>/dev/null)
if [ -z "$S1_FILES" ]; then
log_message "${SERVER_TYPE} 환경 설정 파일이 없습니다."
else
# 각 JSON 파일로 커넥터 생성
for connector_file in $COMMON_KAFKA_DIR/*.json; do
if [ -f "$connector_file" ]; then
CONNECTOR_NAME=$(extract_connector_name "$connector_file")
log_message "${SERVER_TYPE} 환경 Kafka 커넥터 설정 중: $CONNECTOR_NAME (파일: $(basename $connector_file))"
# 커넥터 설정
RESPONSE=$(curl -s -w "\n%{http_code}" -X POST $COMMON_KAFKA_CONNECTOR -H "Content-Type: application/json" -d @"$connector_file")
HTTP_BODY=$(echo "$RESPONSE" | sed '$d')
HTTP_CODE=$(echo "$RESPONSE" | tail -n1)
log_message "응답: $HTTP_BODY"
if [ "$HTTP_CODE" -ne 201 ]; then
echo "Error - $HTTP_CODE: 컨넥터 생성에 실패하였습니다."
exit 1
fi
# 각 요청 사이에 잠시 대기
sleep 2
fi
done
fi
log_message "${SERVER_TYPE} 환경 커넥터 생성 완료"
fi
;;
start-jars)
log_message "수동 실행: Flink JAR 실행"
run_flink_jars
;;
*)
echo "사용법: $0 [start|stop|stop-jobs|start-jars|start-kafka|stop-kafka]"
echo " start : 모든 서비스 시작 (커넥터 생성 및 JAR 실행)"
echo " stop : 모든 서비스 중지 (Flink 작업 중지 및 커넥터 삭제)"
echo " stop-jobs : Flink 작업만 중지"
echo " start-jars : Flink JAR 파일만 실행"
echo " start-Kafka : RMATH 커넥터만 시작"
echo " stop-kafka : RMATH 커넥터만 중지"
exit 1
;;
esac
exit 0
▷ Jenkins Pipeline
더보기
pipeline {
agent any
environment {
PATH = "${env.PATH}:${env.HOME}/bin"
TYPE = "${params.SERVER_TYPE}"
CMD = "${params.COMMAND}"
GIT_URL = "http://gitlab.example.com/cloud/flink-job.git"
GIT_BRANCH = "${params.BRANCH}"
}
stages {
stage('Clone') {
steps {
script {
checkout([$class: 'GitSCM',
branches: [[name: "${GIT_BRANCH}"]],
userRemoteConfigs: [[credentialsId: 'gitlab-access-token', url: "${GIT_URL}"]]
])
}
}
}
stage('command') {
steps {
script {
sh """
echo "org.gradle.java.home=/usr/lib/jvm/java-17-openjdk-amd64" > gradle.properties
SERVER_TYPE=$SERVER_TYPE JENKINS_USER=$JENKINS_USER JENKINS_TOKEN=$JENKINS_TOKEN ./cdc_manager.sh $CMD
"""
}
}
}
}
}
대표적으로 정지, 시작의 동작 과정과 구현 방식에 대해 설명하고자 한다.
정지
- Flink Job 제거
- Flink의 실행 중인 프로세스 중 관련되어 있는 JobId를 통해 Job을 중지 시킨다. - Kafka-Connect (pod) 를 삭제한다. (replica 0)
- connector만 제거하였을 경우 내부적인 결함이 생기는 경우가 있다.
connect-configs, connect-offsets, connect-status 3가지 토픽은 Connector의 장애 복구를 위해 Kafka-Connect에 Connector를 생성할 때 같이 토픽으로 생성된다.
- Kafka Topic 제거
- connector 생성 시 prefix를 지정해주고, 해당 prefix로 시작하는 토픽들을 필터링하여 삭제한다. - Kafka-Connect (pod) 를 생성한다. (replica 1)
- pod가 재시작 될 때, connect-* 토픽 3종이 존재하지 않을 경우 모두 초기화된다.
시작
- Kafka-Connect (pod)에 Connector를 생성한다.
- Connector가 생성한다.
- Connector에서 모든 토픽을 생성 후 Jenkins Trigger를 유발한다.
- 모든 토픽이 생성 된 후 Flink의 Job을 실행 시켜야 정상 구동 되기에 Flag값을 변경해 줌으로서
Flink에 Job을 등록해도 됨을 인지 시킨다.
▷ Trigger (Jenkins pipeline)
더보기
pipeline {
agent any
environment {
TYPE = "${params.SERVER_TYPE}"
SCHEMA = "${params.SCHEMA}"
}
stages {
stage('Check and Update Flag') {
steps {
script {
def flagFile = "${HOME}/mysql-connector/${TYPE}/${SCHEMA}flag"
if (fileExists(flagFile)) {
def content = readFile(flagFile).trim()
echo "현재 ${flagFile} 값: ${content}"
if (content == "false") {
writeFile file: flagFile, text: "true"
echo "${flagFile} 값을 true로 업데이트했습니다."
}
} else {
echo "${flagFile} 파일이 존재하지 않아 새로 생성합니다."
writeFile file: flagFile, text: "true"
}
}
}
}
}
}
Connector에서 모든 토픽을 생성했을 때 Trigger를 유발 시키기 위해 debezium Jar들을 커스텀하였으며, 소스 코드는 하단 깃에서 확인 가능하다. (1.7버전 브랜치를 확인하면 된다.)
https://github.com/VenusIM/debezium
4. Job 빌드 및 Flink에 Job 등록
5. Flink가 인지하는 Flag 초기화를 위해 Jenkins Trigger를 유발한다.
▷ Trigger (Jenkins pipeline)
더보기
pipeline {
agent any
environment {
TYPE = "${params.SERVER_TYPE}"
}
stages {
stage('Check and Update Flag') {
steps {
script {
def flagFile = "${HOME}/mysql-connector/${TYPE}/lmsflag"
if (fileExists(flagFile)) {
def content = readFile(flagFile).trim()
echo "현재 ${flagFile} 값: ${content}"
if (content == "true") {
writeFile file: flagFile, text: "false"
echo "${flagFile} 값을 flase로 업데이트했습니다."
}
} else {
echo "${flagFile} 파일이 존재하지 않아 새로 생성합니다."
writeFile file: flagFile, text: "false"
}
flagFile = "${HOME}/mysql-connector/${TYPE}/lcmsflag"
if (fileExists(flagFile)) {
def content = readFile(flagFile).trim()
echo "현재 ${flagFile} 값: ${content}"
if (content == "true") {
writeFile file: flagFile, text: "false"
echo "${flagFile} 값을 flase로 업데이트했습니다."
}
} else {
echo "${flagFile} 파일이 존재하지 않아 새로 생성합니다."
writeFile file: flagFile, text: "false"
}
}
}
}
}
}