大数据

kafka集群启停脚本

#!/bin/bash

# 新增:检查并杀死残留进程函数
force_kill_kafka() {
    echo "----- 强制清理 $i 的Kafka进程 -----"
    ssh $i "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print \$2}' | xargs kill -9 2>/dev/null || true"
    ssh $i "rm -f /data/datas/kafka/.lock 2>/dev/null"  # 清理锁文件
    sleep 2
}

case $1 in
"start") {
        for i in kafka001 kafka002 kafka003
        do
            echo "----- 启动Kafka $i -----"
            # 先强制清理可能存在的残留进程和锁文件
            force_kill_kafka $i
            # 然后启动Kafka
            ssh $i "source /etc/profile; /data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties"
            sleep 3  # 给启动留点时间间隔
        done
};;
"stop") {
        for i in kafka001 kafka002 kafka003
        do
            echo "----- 停止Kafka $i -----"
            # 先尝试优雅停止
            ssh $i "source /etc/profile; /data/kafka/bin/kafka-server-stop.sh"
            sleep 5  # 等待优雅停止完成
            
            # 检查是否还有进程残留,如果有则强制清理
            if ssh $i "ps -ef | grep kafka.Kafka | grep -v grep" > /dev/null; then
                echo "检测到残留进程,强制清理..."
                force_kill_kafka $i
            fi
        done
};;
"force-clean") {  # 新增:强制清理所有节点
        for i in kafka001 kafka002 kafka003
        do
            echo "----- 强制清理 $i -----"
            force_kill_kafka $i
        done
};;
"status") {  # 新增:检查状态
        for i in kafka001 kafka002 kafka003
        do
            echo -n "Kafka $i 状态: "
            if ssh $i "ps -ef | grep kafka.Kafka | grep -v grep" > /dev/null; then
                echo "运行中"
            else
                echo "已停止"
            fi
        done
};;
*)
    echo "用法: $0 {start|stop|force-clean|status}"
    exit 1
;;
esac