¥
立即购买

智能运维脚本生成专家

204 浏览
25 试用
7 购买
Nov 19, 2025更新

本提示词专为IT运维人员设计,能够根据用户输入的服务器类型、运维任务类型和具体配置参数,自动生成专业、安全、高效的运维脚本。通过深度分析用户需求,结合最佳实践和安全性考量,提供可立即执行的脚本代码,显著提升运维效率,降低人为错误风险,确保系统稳定运行。支持多种主流操作系统和常见运维场景,包括系统监控、日志分析、备份恢复、性能优化等任务。

脚本概述

  • 功能描述:

    • 在指定 Linux 主机上以幂等方式安装并启用 Prometheus node_exporter v1.6.x(systemd 托管,独立用户 monitor)
    • 按最小权限原则加固服务(非 root 运行、systemd 硬化)
    • 仅开放 9100/TCP 给 10.0.20.5(优先使用 firewalld/ufw,必要时使用 iptables 规则,记录变更)
    • 部署后进行本地自检(curl /metrics),失败自动回滚(恢复旧二进制与 systemd 配置、撤销防火墙改动、删除临时创建用户等)
    • 保留自定义配置(/etc/node_exporter/flags.env),并以独立文件注入必要过滤参数;重复执行不重复安装
    • 记录全流程日志 /var/log/ops/monitor_install.log,配置 logrotate 保留 7 天
    • 可选生成 Prometheus 告警规则(CPU/内存/磁盘/inode)供 10.0.20.5 使用
  • 适用环境:

    • 目标系统:Linux(基于 systemd 的发行版,如 RHEL/CentOS 7+/8+/9+、Rocky/Alma、Debian/Ubuntu、Amazon Linux 2/2023)
    • 需要可用的包与工具:bash、systemd、curl 或 wget、tar
    • 网络要求:目标主机可访问 GitHub Releases(或自行通过 --download-url 指定内网镜像)
  • 执行权限:

    • 本地控制端:普通用户可执行,但需具备对远程主机的 SSH 免密或可 sudo 权限
    • 远程目标主机:需要 root 权限(建议通过 sudo 提权)

完整代码

#!/usr/bin/env bash
# Node Exporter v1.6.x 安装与安全加固(多主机/本地均可)
# - 安装到 /opt/node_exporter/bin/node_exporter
# - 自定义参数文件:/etc/node_exporter/flags.env(保留用户自定义)
# - 脚本强制参数文件:/etc/node_exporter/ops-required.env(仅添加/覆盖必要过滤)
# - systemd 服务:/etc/systemd/system/node_exporter.service
# - 日志:/var/log/ops/monitor_install.log(logrotate 保留7天)
# - 防火墙仅放行 10.0.20.5:9100,优先 firewalld/ufw,不存在则使用 iptables(尽量最小化影响)
# - 自检失败自动回滚
set -euo pipefail

umask 027

# ===== 默认参数(可被命令行覆盖) =====
TARGETS_DEFAULT="10.0.12.15,10.0.12.16,10.0.12.17"
PROM_IP_DEFAULT="10.0.20.5"
PROM_PORT_DEFAULT="9100"
NODE_EXPORTER_VERSION_DEFAULT="1.6.1"
NET_IF_DEFAULT="eth0"
LOG_FILE_DEFAULT="/var/log/ops/monitor_install.log"
SSH_USER_DEFAULT="root"
SSH_PORT_DEFAULT="22"
SSH_OPTS_DEFAULT="-o BatchMode=yes -o StrictHostKeyChecking=accept-new -o ConnectTimeout=5 -o ServerAliveInterval=30"
DOWNLOAD_URL_DEFAULT=""       # 留空时自动拼接 GitHub Releases
MODE_DEFAULT="remote"         # remote | local
GENERATE_RULES_DEFAULT="false"

# ===== 全局变量(初始化为默认) =====
TARGETS="$TARGETS_DEFAULT"
PROM_IP="$PROM_IP_DEFAULT"
PROM_PORT="$PROM_PORT_DEFAULT"
NODE_EXPORTER_VERSION="$NODE_EXPORTER_VERSION_DEFAULT"
NET_IF="$NET_IF_DEFAULT"
LOG_FILE="$LOG_FILE_DEFAULT"
SSH_USER="$SSH_USER_DEFAULT"
SSH_PORT="$SSH_PORT_DEFAULT"
SSH_OPTS="$SSH_OPTS_DEFAULT"
DOWNLOAD_URL="$DOWNLOAD_URL_DEFAULT"
MODE="$MODE_DEFAULT"
GENERATE_RULES="$GENERATE_RULES_DEFAULT"
DRY_RUN="false"

print_usage() {
  cat <<EOF
用法: $0 [选项]
  -H, --hosts           目标主机列表(逗号分隔),默认: ${TARGETS_DEFAULT}
  -m, --mode            执行模式: remote|local,默认: ${MODE_DEFAULT}
  -u, --ssh-user        SSH 用户,默认: ${SSH_USER_DEFAULT}
  -p, --ssh-port        SSH 端口,默认: ${SSH_PORT_DEFAULT}
  -o, --ssh-opts        追加 SSH 选项(附加到默认值)
  -v, --version         node_exporter 版本,默认: ${NODE_EXPORTER_VERSION_DEFAULT}
  --prom-ip             允许访问 9100 的 Prometheus IP,默认: ${PROM_IP_DEFAULT}
  --prom-port           node_exporter 端口,默认: ${PROM_PORT_DEFAULT}
  --net-if              网卡白名单(netdev include),默认: ${NET_IF_DEFAULT}
  --log-file            安装日志文件,默认: ${LOG_FILE_DEFAULT}
  --download-url        指定 node_exporter 压缩包 URL(可指向内网镜像);留空使用 GitHub
  --dry-run             试运行(不更改远端,仅打印将执行的操作)
  --generate-rules      在本地输出 Prometheus 告警规则 YAML 到标准输出后退出
  -h, --help            显示帮助

示例(远程多主机):
  $0 -H "10.0.12.15,10.0.12.16,10.0.12.17" --prom-ip 10.0.20.5

示例(本地主机):
  sudo $0 -m local --prom-ip 10.0.20.5

示例(生成告警规则):
  $0 --generate-rules > node.rules.yml
EOF
}

log()   { echo "[$(date '+%F %T')] [INFO] $*"; }
warn()  { echo "[$(date '+%F %T')] [WARN] $*" >&2; }
error() { echo "[$(date '+%F %T')] [ERROR] $*" >&2; }

parse_args() {
  while [[ $# -gt 0 ]]; do
    case "$1" in
      -H|--hosts)          TARGETS="$2"; shift 2;;
      -m|--mode)           MODE="$2"; shift 2;;
      -u|--ssh-user)       SSH_USER="$2"; shift 2;;
      -p|--ssh-port)       SSH_PORT="$2"; shift 2;;
      -o|--ssh-opts)       SSH_OPTS="$SSH_OPTS $2"; shift 2;;
      -v|--version)        NODE_EXPORTER_VERSION="$2"; shift 2;;
      --prom-ip)           PROM_IP="$2"; shift 2;;
      --prom-port)         PROM_PORT="$2"; shift 2;;
      --net-if)            NET_IF="$2"; shift 2;;
      --log-file)          LOG_FILE="$2"; shift 2;;
      --download-url)      DOWNLOAD_URL="$2"; shift 2;;
      --dry-run)           DRY_RUN="true"; shift 1;;
      --generate-rules)    GENERATE_RULES="true"; shift 1;;
      -h|--help)           print_usage; exit 0;;
      *) error "未知参数: $1"; print_usage; exit 2;;
    esac
  done
}

generate_rules_yaml() {
  # 生成与需求一致的告警规则(供 10.0.20.5 的 Prometheus 引用)
  # 过滤 tmpfs/docker/snap, 网卡仅计入 eth0 的逻辑在 exporter 端已控制;此处按常规过滤增强鲁棒性
  cat <<'YAML'
groups:
- name: node_basic_alerts
  rules:
  - alert: HighCpuUsage
    expr: |
      avg by (instance) (rate(node_cpu_seconds_total{mode!="idle"}[5m])) > 0.85
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "CPU 使用率高 (instance={{ $labels.instance }})"
      description: "CPU 使用率持续5m超过85%"

  - alert: HighMemoryUsage
    expr: |
      (1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)) > 0.90
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "内存使用率高 (instance={{ $labels.instance }})"
      description: "内存使用率超过90%"

  - alert: DiskUsageHighVar
    expr: |
      (node_filesystem_size_bytes{mountpoint="/var",fstype!~"tmpfs|squashfs|nsfs|overlay|aufs|bcachefs",device!~"docker.*|containerd.*|loop.*|snap.*"}
       - node_filesystem_free_bytes{mountpoint="/var",fstype!~"tmpfs|squashfs|nsfs|overlay|aufs|bcachefs",device!~"docker.*|containerd.*|loop.*|snap.*"})
        / node_filesystem_size_bytes{mountpoint="/var",fstype!~"tmpfs|squashfs|nsfs|overlay|aufs|bcachefs",device!~"docker.*|containerd.*|loop.*|snap.*"} > 0.80
    for: 10m
    labels:
      severity: warning
    annotations:
      summary: "/var 磁盘使用率高 (instance={{ $labels.instance }})"
      description: "/var 使用率超过80%"

  - alert: DiskUsageHighHome
    expr: |
      (node_filesystem_size_bytes{mountpoint="/home",fstype!~"tmpfs|squashfs|nsfs|overlay|aufs|bcachefs",device!~"docker.*|containerd.*|loop.*|snap.*"}
       - node_filesystem_free_bytes{mountpoint="/home",fstype!~"tmpfs|squashfs|nsfs|overlay|aufs|bcachefs",device!~"docker.*|containerd.*|loop.*|snap.*"})
        / node_filesystem_size_bytes{mountpoint="/home",fstype!~"tmpfs|squashfs|nsfs|overlay|aufs|bcachefs",device!~"docker.*|containerd.*|loop.*|snap.*"} > 0.80
    for: 10m
    labels:
      severity: warning
    annotations:
      summary: "/home 磁盘使用率高 (instance={{ $labels.instance }})"
      description: "/home 使用率超过80%"

  - alert: InodeLow
    expr: |
      (node_filesystem_files_free{fstype!~"tmpfs|squashfs|nsfs|overlay|aufs|bcachefs",device!~"docker.*|containerd.*|loop.*|snap.*"} 
       / node_filesystem_files{fstype!~"tmpfs|squashfs|nsfs|overlay|aufs|bcachefs",device!~"docker.*|containerd.*|loop.*|snap.*"}) < 0.10
    for: 10m
    labels:
      severity: warning
    annotations:
      summary: "inode 剩余低 (instance={{ $labels.instance }})"
      description: "inode 剩余少于10%"
YAML
}

# ===== 远端执行脚本(通过 SSH 下发) =====
remote_payload() {
  # 变量从环境传入:PROM_IP PROM_PORT NODE_EXPORTER_VERSION NET_IF LOG_FILE DOWNLOAD_URL DRY_RUN
  cat <<'REMOTE_EOF'
set -euo pipefail
umask 027

PROM_IP="${PROM_IP:-10.0.20.5}"
PROM_PORT="${PROM_PORT:-9100}"
NODE_EXPORTER_VERSION="${NODE_EXPORTER_VERSION:-1.6.1}"
NET_IF="${NET_IF:-eth0}"
LOG_FILE="${LOG_FILE:-/var/log/ops/monitor_install.log}"
DOWNLOAD_URL="${DOWNLOAD_URL:-}"
DRY_RUN="${DRY_RUN:-false}"

NE_USER="monitor"
NE_GROUP="monitor"
NE_HOME="/var/lib/node_exporter"
NE_BIN_DIR="/opt/node_exporter/bin"
NE_BIN="${NE_BIN_DIR}/node_exporter"
NE_ETC="/etc/node_exporter"
NE_ENV_CUSTOM="${NE_ETC}/flags.env"
NE_ENV_REQUIRED="${NE_ETC}/ops-required.env"
NE_SERVICE="/etc/systemd/system/node_exporter.service"
NE_STATE="${NE_HOME}/install_state"
BACKUP_DIR_BASE="/opt/node_exporter/backup"
TS="$(date +%Y%m%d%H%M%S)"
BACKUP_DIR="${BACKUP_DIR_BASE}/${TS}"

mkdir -p "$(dirname "${LOG_FILE}")"
touch "${LOG_FILE}"
chmod 0640 "${LOG_FILE}"

exec &> >(tee -a "${LOG_FILE}")

log()   { echo "[$(date '+%F %T')] [INFO] $*"; }
warn()  { echo "[$(date '+%F %T')] [WARN] $*" >&2; }
error() { echo "[$(date '+%F %T')] [ERROR] $*" >&2; }

require_root() {
  if [[ "$(id -u)" -ne 0 ]]; then
    error "需要 root 权限执行"
    exit 1
  fi
}

setup_logrotate() {
  local lr="/etc/logrotate.d/ops-monitor-install"
  if [[ ! -f "$lr" ]]; then
    cat > "$lr" <<EOF
${LOG_FILE} {
  daily
  rotate 7
  missingok
  compress
  notifempty
  copytruncate
}
EOF
    log "创建 logrotate 规则: $lr"
  fi
}

detect_arch() {
  local m="$(uname -m)"
  case "$m" in
    x86_64) echo "amd64" ;;
    aarch64) echo "arm64" ;;
    armv7l) echo "armv7" ;;
    ppc64le) echo "ppc64le" ;;
    s390x) echo "s390x" ;;
    *) error "不支持的架构: $m"; exit 1;;
  esac
}

get_current_version() {
  if [[ -x "${NE_BIN}" ]]; then
    "${NE_BIN}" --version 2>/dev/null | awk 'NR==1{print $3}' || true
  else
    echo ""
  fi
}

download_node_exporter() {
  local arch="$1"
  local ver="$2"
  local url
  local tmpdir="/tmp/node_exporter_${ver}_${arch}_${TS}"
  mkdir -p "$tmpdir"
  if [[ -n "${DOWNLOAD_URL}" ]]; then
    url="${DOWNLOAD_URL}"
  else
    url="https://github.com/prometheus/node_exporter/releases/download/v${ver}/node_exporter-${ver}.linux-${arch}.tar.gz"
  fi
  log "下载 node_exporter ${ver} (${arch}) from ${url}"
  if [[ "${DRY_RUN}" == "true" ]]; then
    echo "[DRY-RUN] curl -fSL ${url} -o ${tmpdir}/node_exporter.tar.gz"
    return 0
  fi
  if command -v curl >/dev/null 2>&1; then
    curl -fSL "${url}" -o "${tmpdir}/node_exporter.tar.gz"
  elif command -v wget >/dev/null 2>&1; then
    wget -O "${tmpdir}/node_exporter.tar.gz" "${url}"
  else
    error "需要 curl 或 wget"
    exit 1
  fi
  tar -C "${tmpdir}" -xzf "${tmpdir}/node_exporter.tar.gz"
  local src="${tmpdir}/node_exporter-${ver}.linux-${arch}/node_exporter"
  if [[ ! -f "${src}" ]]; then
    error "解压后未找到 node_exporter 二进制"
    exit 1
  fi
  mkdir -p "${NE_BIN_DIR}"
  install -m 0755 "${src}" "${NE_BIN}"
}

create_user_group() {
  local created="false"
  if ! getent group "${NE_GROUP}" >/dev/null; then
    if [[ "${DRY_RUN}" == "true" ]]; then
      echo "[DRY-RUN] groupadd --system ${NE_GROUP}"
    else
      groupadd --system "${NE_GROUP}"
    fi
    created="true"
  fi
  local nologin="/usr/sbin/nologin"
  [[ -x "$nologin" ]] || nologin="/sbin/nologin"
  if ! id -u "${NE_USER}" >/dev/null 2>&1; then
    if [[ "${DRY_RUN}" == "true" ]]; then
      echo "[DRY-RUN] useradd --system --home ${NE_HOME} --shell ${nologin} -g ${NE_GROUP} ${NE_USER}"
    else
      useradd --system --home "${NE_HOME}" --shell "${nologin}" -g "${NE_GROUP}" "${NE_USER}"
    fi
    created="true"
  fi
  mkdir -p "${NE_HOME}" "${NE_ETC}"
  chown -R "${NE_USER}:${NE_GROUP}" "${NE_HOME}"
  if [[ "${created}" == "true" ]]; then
    echo "CREATED_USER=1" >> "${NE_STATE}.tmp"
  fi
}

backup_existing() {
  mkdir -p "${BACKUP_DIR}"
  local backed="false"
  if [[ -f "${NE_BIN}" ]]; then
    cp -a "${NE_BIN}" "${BACKUP_DIR}/node_exporter.bak"
    backed="true"
  fi
  if [[ -f "${NE_SERVICE}" ]]; then
    cp -a "${NE_SERVICE}" "${BACKUP_DIR}/node_exporter.service.bak"
    backed="true"
  fi
  if [[ -f "${NE_ENV_CUSTOM}" ]]; then
    cp -a "${NE_ENV_CUSTOM}" "${BACKUP_DIR}/flags.env.bak"
    backed="true"
  fi
  if [[ -f "${NE_ENV_REQUIRED}" ]]; then
    cp -a "${NE_ENV_REQUIRED}" "${BACKUP_DIR}/ops-required.env.bak"
    backed="true"
  fi
  if [[ "${backed}" == "true" ]]; then
    echo "BACKUP_DIR=${BACKUP_DIR}" >> "${NE_STATE}.tmp"
    log "已备份现有文件到 ${BACKUP_DIR}"
  fi
}

write_required_env() {
  # 仅放置本脚本要求的过滤选项,保留用户自定义 flags.env 不覆盖
  # 过滤: 忽略 tmpfs、docker*、snap;网卡白名单 eth0;端口 PROM_PORT
  # 使用 NODE_EXPORTER_REQUIRED_OPTS 变量,systemd 中按 custom + required 顺序传入,required 后置覆盖
  local fs_types='^(tmpfs)$'
  local mp_excl='^(/var/lib/docker/.+|/docker.+|/snap($|/).*)'
  cat > "${NE_ENV_REQUIRED}" <<EOF
# 自动生成(请勿手动修改),自定义请写入 ${NE_ENV_CUSTOM}
NODE_EXPORTER_REQUIRED_OPTS="--web.listen-address=:${PROM_PORT} \\
  --collector.filesystem.fs-types-exclude=${fs_types} \\
  --collector.filesystem.mount-points-exclude=${mp_excl} \\
  --collector.netdev.device-include=^(${NET_IF})$"
EOF
  chown "${NE_USER}:${NE_GROUP}" "${NE_ENV_REQUIRED}"
  chmod 0640 "${NE_ENV_REQUIRED}"
}

ensure_custom_env_exists() {
  # 若用户未创建自定义文件,生成一个模板但不包含任何强制参数
  if [[ ! -f "${NE_ENV_CUSTOM}" ]]; then
    cat > "${NE_ENV_CUSTOM}" <<'EOF'
# 自定义 node_exporter 启动参数(可选)
# 示例:
# NODE_EXPORTER_OPTS="--collector.textfile.directory=/var/lib/node_exporter/textfile_collector"
NODE_EXPORTER_OPTS=""
EOF
    chown "${NE_USER}:${NE_GROUP}" "${NE_ENV_CUSTOM}"
    chmod 0640 "${NE_ENV_CUSTOM}"
  fi
}

write_systemd_unit() {
  local need_reload="false"
  local unit_content="[Unit]
Description=Prometheus Node Exporter
Wants=network-online.target
After=network-online.target

[Service]
User=${NE_USER}
Group=${NE_GROUP}
Type=simple
EnvironmentFile=-${NE_ENV_CUSTOM}
EnvironmentFile=-${NE_ENV_REQUIRED}
ExecStart=${NE_BIN} \$NODE_EXPORTER_OPTS \$NODE_EXPORTER_REQUIRED_OPTS
Restart=on-failure
RestartSec=5s
NoNewPrivileges=true
ProtectSystem=full
ProtectHome=true
PrivateTmp=true
PrivateDevices=true
ProtectKernelTunables=true
ProtectKernelModules=true
LockPersonality=true
RestrictNamespaces=true
RestrictRealtime=true
SystemCallArchitectures=native
UMask=027

[Install]
WantedBy=multi-user.target
"
  if [[ ! -f "${NE_SERVICE}" ]]; then
    echo "${unit_content}" > "${NE_SERVICE}"
    need_reload="true"
    echo "CREATED_SERVICE=1" >> "${NE_STATE}.tmp"
  else
    # 尽量不覆盖已有服务。但若发现不是monitor用户或ExecStart不包含所需环境文件,则进行温和修正(先备份)。
    if ! grep -q "^User=${NE_USER}$" "${NE_SERVICE}" || ! grep -q "${NE_ENV_REQUIRED}" "${NE_SERVICE}"; then
      cp -a "${NE_SERVICE}" "${NE_SERVICE}.${TS}.bak"
      echo "${unit_content}" > "${NE_SERVICE}"
      need_reload="true"
      echo "UPDATED_SERVICE=1" >> "${NE_STATE}.tmp"
      log "已更新 systemd unit 并备份为 ${NE_SERVICE}.${TS}.bak"
    fi
  fi
  chown root:root "${NE_SERVICE}"
  chmod 0644 "${NE_SERVICE}"
  if [[ "${need_reload}" == "true" ]]; then
    systemctl daemon-reload
  fi
}

ensure_permissions() {
  chown -R "${NE_USER}:${NE_GROUP}" "${NE_BIN_DIR}" "${NE_ETC}"
}

start_enable_service() {
  systemctl enable node_exporter >/dev/null 2>&1 || true
  systemctl restart node_exporter
  systemctl is-active --quiet node_exporter
}

# --- 防火墙处理 ---
fw_allow_firewalld() {
  local changed="false"
  if firewall-cmd --state >/dev/null 2>&1; then
    # 移除可能存在的对 9100 的全局放行(谨慎)
    if firewall-cmd --permanent --list-ports | grep -qw "${PROM_PORT}/tcp"; then
      firewall-cmd --permanent --remove-port="${PROM_PORT}/tcp" || true
      changed="true"
      echo "FW_CHG=removed_global_port" >> "${NE_STATE}.tmp"
    fi
    # 添加仅对 PROM_IP 放行
    local rule="rule family=ipv4 source address=${PROM_IP} port protocol=tcp port=${PROM_PORT} accept"
    if ! firewall-cmd --permanent --query-rich-rule="$rule" >/dev/null; then
      firewall-cmd --permanent --add-rich-rule="$rule"
      changed="true"
      echo "FW_CHG=add_allow_${PROM_IP}_${PROM_PORT}" >> "${NE_STATE}.tmp"
    end_if=true
    fi
    if [[ "${changed}" == "true" ]]; then
      firewall-cmd --reload || true
    fi
    return 0
  fi
  return 1
}

fw_allow_ufw() {
  if command -v ufw >/dev/null 2>&1 && ufw status | grep -q "Status: active"; then
    # 删除任何全局 9100 放行
    if ufw status | grep -Eqw "${PROM_PORT}/tcp\s+ALLOW\s+Anywhere"; then
      yes | ufw delete allow "${PROM_PORT}/tcp" || true
      echo "FW_CHG=ufw_del_global" >> "${NE_STATE}.tmp"
    fi
    # 添加仅对 PROM_IP 放行
    if ! ufw status | grep -Eqw "${PROM_PORT}/tcp\s+ALLOW\s+${PROM_IP}"; then
      ufw allow proto tcp from "${PROM_IP}" to any port "${PROM_PORT}"
      echo "FW_CHG=ufw_add_${PROM_IP}_${PROM_PORT}" >> "${NE_STATE}.tmp"
    fi
    ufw reload || true
    return 0
  fi
  return 1
}

fw_allow_iptables() {
  # 仅在 firewalld/ufw 不可用时使用;在 INPUT 链前插入规则:允许 PROM_IP:PORT,随后丢弃其它对该端口的访问
  # 尽量幂等:先查询再插入
  local ipt="iptables"
  local ipt6="ip6tables"
  if ! command -v ${ipt} >/dev/null 2>&1; then
    warn "未检测到 firewalld/ufw/iptables,跳过端口限制(请手工加固)"
    return 1
  fi

  # IPv4
  if ! ${ipt} -C INPUT -p tcp --dport "${PROM_PORT}" -s "${PROM_IP}" -j ACCEPT 2>/dev/null; then
    ${ipt} -I INPUT 1 -p tcp --dport "${PROM_PORT}" -s "${PROM_IP}" -j ACCEPT
    echo "FW_CHG=ipt_v4_accept_${PROM_IP}_${PROM_PORT}" >> "${NE_STATE}.tmp"
  fi
  if ! ${ipt} -C INPUT -p tcp --dport "${PROM_PORT}" -j DROP 2>/dev/null; then
    ${ipt} -I INPUT 2 -p tcp --dport "${PROM_PORT}" -j DROP
    echo "FW_CHG=ipt_v4_drop_${PROM_PORT}" >> "${NE_STATE}.tmp"
  fi

  # IPv6(如不需要可忽略,保持最小改动:仅丢弃该端口)
  if command -v ${ipt6} >/dev/null 2>&1; then
    if ! ${ipt6} -C INPUT -p tcp --dport "${PROM_PORT}" -j DROP 2>/dev/null; then
      ${ipt6} -I INPUT 1 -p tcp --dport "${PROM_PORT}" -j DROP
      echo "FW_CHG=ipt_v6_drop_${PROM_PORT}" >> "${NE_STATE}.tmp"
    fi
  fi

  # 持久化(若可用)
  if command -v netfilter-persistent >/dev/null 2>&1; then
    netfilter-persistent save || true
  elif command -v service >/dev/null 2>&1 && service netfilter-persistent status >/dev/null 2>&1; then
    service netfilter-persistent save || true
  elif command -v iptables-save >/dev/null 2>&1 && [[ -d /etc/iptables ]]; then
    iptables-save > /etc/iptables/rules.v4 || true
    command -v ip6tables-save >/dev/null 2>&1 && ip6tables-save > /etc/iptables/rules.v6 || true
  else
    warn "iptables 规则可能不会持久化,请根据发行版安装持久化工具(如 iptables-persistent)"
  fi
  return 0
}

apply_firewall() {
  log "应用防火墙策略:仅允许 ${PROM_IP} 访问 ${PROM_PORT}/tcp"
  if [[ "${DRY_RUN}" == "true" ]]; then
    echo "[DRY-RUN] 尝试 firewalld/ufw/iptables 依次应用"
    return 0
  fi
  fw_allow_firewalld && return 0
  fw_allow_ufw && return 0
  fw_allow_iptables && return 0
  warn "未成功应用任何防火墙加固,请手工限制对 ${PROM_PORT}/tcp 的访问来源为 ${PROM_IP}"
}

self_check() {
  log "执行自检: curl http://127.0.0.1:${PROM_PORT}/metrics"
  for i in {1..15}; do
    if curl -fsS "http://127.0.0.1:${PROM_PORT}/metrics" | grep -q "^node_exporter_build_info"; then
      log "自检成功"
      return 0
    fi
    sleep 1
  done
  return 1
}

rollback() {
  warn "开始回滚..."
  systemctl stop node_exporter || true

  if [[ -f "${NE_STATE}" ]]; then
    # shellcheck disable=SC1090
    source "${NE_STATE}" || true
  fi
  if [[ -n "${BACKUP_DIR:-}" && -d "${BACKUP_DIR}" ]]; then
    [[ -f "${BACKUP_DIR}/node_exporter.bak" ]] && install -m 0755 "${BACKUP_DIR}/node_exporter.bak" "${NE_BIN}" || rm -f "${NE_BIN}"
    [[ -f "${BACKUP_DIR}/node_exporter.service.bak" ]] && install -m 0644 "${BACKUP_DIR}/node_exporter.service.bak" "${NE_SERVICE}" || rm -f "${NE_SERVICE}"
    [[ -f "${BACKUP_DIR}/flags.env.bak" ]] && install -m 0640 "${BACKUP_DIR}/flags.env.bak" "${NE_ENV_CUSTOM}" || true
    [[ -f "${BACKUP_DIR}/ops-required.env.bak" ]] && install -m 0640 "${BACKUP_DIR}/ops-required.env.bak" "${NE_ENV_REQUIRED}" || rm -f "${NE_ENV_REQUIRED}"
    systemctl daemon-reload || true
  else
    # 无备份:尽力清理我们创建的文件
    rm -f "${NE_ENV_REQUIRED}" || true
  fi

  # 撤销防火墙变更(仅尝试删除我们添加的规则)
  if command -v firewall-cmd >/dev/null 2>&1 && firewall-cmd --state >/dev/null 2>&1; then
    local rule="rule family=ipv4 source address=${PROM_IP} port protocol=tcp port=${PROM_PORT} accept"
    firewall-cmd --permanent --remove-rich-rule="$rule" || true
    firewall-cmd --reload || true
  fi
  if command -v ufw >/dev/null 2>&1 && ufw status | grep -q "Status: active"; then
    yes | ufw delete allow proto tcp from "${PROM_IP}" to any port "${PROM_PORT}" || true
    ufw reload || true
  fi
  if command -v iptables >/dev/null 2>&1; then
    iptables -D INPUT -p tcp --dport "${PROM_PORT}" -s "${PROM_IP}" -j ACCEPT 2>/dev/null || true
    iptables -D INPUT -p tcp --dport "${PROM_PORT}" -j DROP 2>/dev/null || true
    command -v netfilter-persistent >/dev/null 2>&1 && netfilter-persistent save || true
  fi

  # 删除用户(仅当我们创建过且系统中不再需要时)
  if [[ -f "${NE_STATE}" ]] && grep -q "^CREATED_USER=1" "${NE_STATE}"; then
    if id -u "${NE_USER}" >/dev/null 2>&1; then
      userdel "${NE_USER}" || true
    fi
    getent group "${NE_GROUP}" >/dev/null 2>&1 && groupdel "${NE_GROUP}" || true
  fi

  warn "回滚完成"
}

finalize_state() {
  if [[ -f "${NE_STATE}.tmp" ]]; then
    mv "${NE_STATE}.tmp" "${NE_STATE}"
    chown "${NE_USER}:${NE_GROUP}" "${NE_STATE}"
    chmod 0640 "${NE_STATE}"
  fi
}

main_remote() {
  require_root
  setup_logrotate
  : > "${NE_STATE}.tmp"  # 初始化临时状态文件

  # 前置检测
  for b in systemctl tar; do
    command -v "$b" >/dev/null 2>&1 || { error "缺少必需命令: $b"; exit 1; }
  end
  done

  # 版本判断与幂等
  local arch
  arch="$(detect_arch)"
  local cur_ver
  cur_ver="$(get_current_version || true)"
  log "当前版本: ${cur_ver:-<未安装>}  目标版本: ${NODE_EXPORTER_VERSION}"

  backup_existing
  create_user_group

  if [[ -z "${cur_ver}" || ! "${cur_ver}" =~ ^1\.6\.[0-9]+$ ]]; then
    if [[ "${DRY_RUN}" == "true" ]]; then
      echo "[DRY-RUN] 将安装 node_exporter v${NODE_EXPORTER_VERSION} 到 ${NE_BIN}"
    else
      download_node_exporter "${arch}" "${NODE_EXPORTER_VERSION}"
      echo "INSTALLED_NEW_BIN=1" >> "${NE_STATE}.tmp"
    fi
  else
    log "已安装满足要求的 1.6.x,跳过二进制安装"
  fi

  ensure_custom_env_exists
  write_required_env
  write_systemd_unit
  ensure_permissions

  if [[ "${DRY_RUN}" == "true" ]]; then
    echo "[DRY-RUN] 将启动并启用 systemd 服务 node_exporter"
  else
    start_enable_service
  fi

  if [[ "${DRY_RUN}" == "true" ]]; then
    echo "[DRY-RUN] 将应用防火墙限制: 仅 ${PROM_IP} -> ${PROM_PORT}/tcp"
  else
    apply_firewall
  fi

  if [[ "${DRY_RUN}" == "true" ]]; then
    echo "[DRY-RUN] 自检跳过"
    finalize_state
    log "DRY-RUN 完成(未对系统做实际改动)"
    exit 0
  fi

  if ! self_check; then
    error "自检失败,执行回滚"
    rollback
    exit 1
  fi

  finalize_state
  log "安装/配置完成"
}

main_remote
REMOTE_EOF
}

run_remote_host() {
  local host="$1"
  log "====== 处理主机 ${host} ======"
  if [[ "${MODE}" == "local" ]]; then
    # 本地执行(忽略 SSH)
    PROM_IP="${PROM_IP}" PROM_PORT="${PROM_PORT}" NODE_EXPORTER_VERSION="${NODE_EXPORTER_VERSION}" \
    NET_IF="${NET_IF}" LOG_FILE="${LOG_FILE}" DOWNLOAD_URL="${DOWNLOAD_URL}" DRY_RUN="${DRY_RUN}" \
      bash -s <<'EOF'
$(remote_payload)
EOF
    return $?
  fi

  # 远程执行,通过 sudo 切到 root
  if [[ "${DRY_RUN}" == "true" ]]; then
    log "[DRY-RUN] 将通过 SSH 连接 ${SSH_USER}@${host}:${SSH_PORT}"
  fi
  # 将变量以环境导出给远端
  local envs="PROM_IP='${PROM_IP}' PROM_PORT='${PROM_PORT}' NODE_EXPORTER_VERSION='${NODE_EXPORTER_VERSION}' NET_IF='${NET_IF}' LOG_FILE='${LOG_FILE}' DOWNLOAD_URL='${DOWNLOAD_URL}' DRY_RUN='${DRY_RUN}'"
  ssh -p "${SSH_PORT}" ${SSH_OPTS} "${SSH_USER}@${host}" "sudo env ${envs} bash -s" <<'EOF'
$(remote_payload)
EOF
}

main() {
  parse_args "$@"

  if [[ "${GENERATE_RULES}" == "true" ]]; then
    generate_rules_yaml
    exit 0
  fi

  if [[ "${MODE}" != "remote" && "${MODE}" != "local" ]]; then
    error "--mode 必须为 remote 或 local"
    exit 2
  fi

  IFS=',' read -r -a hosts <<< "${TARGETS}"

  if [[ "${MODE}" == "local" ]]; then
    # 本地模式只处理本机,忽略 hosts
    run_remote_host "127.0.0.1"
    exit $?
  fi

  # 远程模式,多主机串行处理(可根据需要改为并行)
  overall_rc=0
  for h in "${hosts[@]}"; do
    h_trim="$(echo "$h" | xargs)"
    [[ -z "$h_trim" ]] && continue
    if ! run_remote_host "$h_trim"; then
      overall_rc=1
      warn "主机 ${h_trim} 处理失败"
    else
      log "主机 ${h_trim} 完成"
    fi
  done
  exit $overall_rc
}

main "$@"

参数说明

  • --hosts/-H
    • 含义:目标主机列表,逗号分隔
    • 默认值:10.0.12.15,10.0.12.16,10.0.12.17
  • --mode/-m
    • 含义:执行模式(remote 通过 SSH 批量部署;local 在本机部署)
    • 默认值:remote
  • --ssh-user/-u
    • 含义:远程 SSH 用户(需具备 sudo 权限)
    • 默认值:root
  • --ssh-port/-p
    • 含义:SSH 端口
    • 默认值:22
  • --ssh-opts/-o
    • 含义:追加 SSH 选项(附加到脚本内置的安全与稳定选项之后)
    • 默认值:空(已有内置选项)
  • --version/-v
    • 含义:node_exporter 版本
    • 默认值:1.6.1(支持 1.6.x)
  • --prom-ip
    • 含义:允许访问 9100 的 Prometheus 服务器地址
    • 默认值:10.0.20.5
  • --prom-port
    • 含义:node_exporter 监听端口
    • 默认值:9100
  • --net-if
    • 含义:网卡白名单,仅采集匹配设备(RE2,默认仅 eth0)
    • 默认值:eth0
  • --log-file
    • 含义:安装与变更日志文件
    • 默认值:/var/log/ops/monitor_install.log
  • --download-url
    • 含义:指定 node_exporter 压缩包 URL(用于内网镜像);留空使用 GitHub 官方地址
    • 默认值:空
  • --dry-run
    • 含义:试运行,打印将执行的动作但不对系统进行改动
    • 默认值:false
  • --generate-rules
    • 含义:在本地输出 Prometheus 告警规则 YAML(CPU/内存/磁盘/inode),不对目标主机做任何操作
    • 默认值:false

使用示例

  • 批量在 prod-web 集群三台主机上安装并加固,允许 10.0.20.5 拉取

    • ./deploy_node_exporter.sh -H "10.0.12.15,10.0.12.16,10.0.12.17" --prom-ip 10.0.20.5
  • 在单台主机本地执行(例如登录到 10.0.12.15 后执行)

    • sudo ./deploy_node_exporter.sh -m local --prom-ip 10.0.20.5
  • 生成 Prometheus 告警规则文件,在 Prometheus 服务器 10.0.20.5 上使用

    • ./deploy_node_exporter.sh --generate-rules > node.rules.yml
    • 将 node.rules.yml 放至 Prometheus 规则目录,并在 prometheus.yml 中通过 rule_files 引用后重载 Prometheus

注意事项

  • 执行前的准备

    • 建议为运维控制端配置对目标主机的 SSH 免密登录(或可交互 sudo)
    • 目标主机需可访问 GitHub Releases(或提供 --download-url 指向内网镜像)
    • 确保目标系统为 systemd 系列,且具备 curl 或 wget、tar
  • 防火墙与最小权限

    • 脚本优先使用 firewalld/ufw 精确放行 10.0.20.5:9100;若均不可用,将使用 iptables 对 9100 端口进行“仅允许单源”的规则控制
    • 为避免对现有策略造成干扰,脚本不会清空/重置整套规则,仅添加必要规则并尝试移除“对 9100 的全局放行”
    • 在仅 iptables 的环境中,如存在复杂策略或默认 ACCEPT,脚本会插入针对 9100 的 ACCEPT/DROP 规则并尝试持久化,但可能不覆盖自定义场景,请审阅 /var/log/ops/monitor_install.log 并按需调整
  • 配置与幂等

    • 用户可在 /etc/node_exporter/flags.env 中加入自定义参数;脚本不会覆盖该文件
    • 必要过滤参数写入 /etc/node_exporter/ops-required.env,并在 systemd 中置于自定义参数之后传入,确保本脚本要求生效同时保留自定义配置
    • 已安装 1.6.x 会跳过二进制更新,仅校验服务与防火墙
  • 自检与回滚

    • 部署后会本地 curl 127.0.0.1:9100/metrics 校验 node_exporter 工作状态
    • 自检失败将自动回滚至部署前状态(恢复备份、删除本脚本添加的防火墙规则与 ops-required.env、必要时删除 monitor 用户)
  • 性能与资源

    • node_exporter 为轻量级常驻进程,默认内存/CPU开销极低;过滤无关挂载点与网卡可进一步减少指标量
    • 日志写入与 logrotate 影响可忽略;首次安装下载二进制会瞬时占用网络与磁盘 IO
  • Prometheus 侧

    • 请在 10.0.20.5 的 prometheus.yml 中新增 targets 指向上述主机的 :9100,并引入本脚本生成的 node.rules.yml(如需告警)
    • 示例 scrape_config(请按实际结构补充):
      • job_name: node static_configs:
        • targets: ["10.0.12.15:9100","10.0.12.16:9100","10.0.12.17:9100"]
  • 安全与合规

    • 脚本不包含任何敏感信息的硬编码;下载源可通过 --download-url 指定企业内网白名单镜像
    • 所有变更(用户/文件/防火墙/systemd)均记录于 /var/log/ops/monitor_install.log,保留 7 天,便于审计
    • 如需进一步加固(如 TLS/Basic Auth),建议在 Prometheus 与 Exporter 前附加反向代理加密或在内网专用网络中部署

脚本概述

  • 功能描述:
    针对 Windows Server 上 IIS W3C 日志进行近24小时的筛选与统计,自动解压压缩日志(.zip/.gz),按条件“sc-status>=500 或 time-taken>=3000ms”过滤,导出明细CSV与Top20摘要CSV,并生成简要可读报告;支持结果目录打包ZIP与7天保留策略,保证只读访问,不中断IIS服务,兼容UTF-8/ANSI。
  • 适用环境:
    • 操作系统:Windows Server 2019/2016/2022(含Windows 10/11)
    • 组件:PowerShell 5.1+(自带),.NET Framework(随系统)
    • IIS W3C 日志路径(可配置):
      C:\inetpub\logs\LogFiles\W3SVC1;C:\inetpub\logs\LogFiles\W3SVC2
  • 执行权限:
    以普通用户或具备对日志目录只读权限的账户执行即可。结果输出目录需具备写权限。

完整代码

<# 
.SYNOPSIS
  IIS W3C日志近24小时过滤与Top20统计,导出CSV与报告,并打包ZIP与保留7天。

.DESCRIPTION
  - 自动处理原始与压缩日志(.log/.zip/.gz),只读访问原始文件。
  - 过滤条件:sc-status >= StatusThreshold(默认500) OR time-taken >= TimeTakenMsThreshold(默认3000)。
  - 时间范围:近 TimeWindowHours 小时(默认24小时),默认按UTC解析W3C时间(可切换为本地)。
  - 导出:明细CSV、Top20摘要CSV、报告txt;目录ZIP打包;7天清理历史。
  - 兼容UTF-8(含BOM)/ANSI(常见ASCII子集),可指定默认编码名。

.NOTES
  作者:运维脚本开发专家
#>

[CmdletBinding()]
param(
    # 日志目录(可多路径)
    [Parameter(Mandatory=$false)]
    [string[]]$LogDirectories = @(
        'C:\inetpub\logs\LogFiles\W3SVC1',
        'C:\inetpub\logs\LogFiles\W3SVC2'
    ),

    # 输出根目录(按日期建立子目录)
    [Parameter(Mandatory=$false)]
    [string]$OutputRoot = 'D:\ops\log_analysis',

    # 时间窗口(小时)
    [Parameter(Mandatory=$false)]
    [int]$TimeWindowHours = 24,

    # 过滤阈值
    [Parameter(Mandatory=$false)]
    [int]$StatusThreshold = 500,
    [Parameter(Mandatory=$false)]
    [int]$TimeTakenMsThreshold = 3000,

    # Top N
    [Parameter(Mandatory=$false)]
    [int]$TopN = 20,

    # 假定日志时间为UTC(W3C默认为UTC)。若IIS配置为本地时间,请设为$false
    [Parameter(Mandatory=$false)]
    [bool]$AssumeUtc = $true,

    # 解压及临时目录
    [Parameter(Mandatory=$false)]
    [string]$TempExtractRoot = "$env:TEMP\iis_log_extract",

    # 文件查找回溯天数(减少扫描无关文件,实际仍按时间字段再过滤)
    [Parameter(Mandatory=$false)]
    [int]$LookbackDaysForFiles = 3,

    # 默认编码名(当无BOM时使用);常见可选:'utf-8','windows-1252','gb18030'
    [Parameter(Mandatory=$false)]
    [string]$DefaultEncodingName = 'utf-8',

    # 结果保留天数
    [Parameter(Mandatory=$false)]
    [int]$RetentionDays = 7,

    # 是否执行保留策略清理(仅限OutputRoot内产物)
    [Parameter(Mandatory=$false)]
    [bool]$EnforceRetention = $true
)

begin {
    Set-StrictMode -Version Latest
    $ErrorActionPreference = 'Stop'

    # 准备输出目录
    $dateTag = (Get-Date -Format 'yyyyMMdd_HHmm')
    $todayTag = (Get-Date -Format 'yyyyMMdd')
    $OutputDir = Join-Path $OutputRoot $dateTag
    if (-not (Test-Path $OutputDir)) { New-Item -ItemType Directory -Path $OutputDir | Out-Null }

    # 加载压缩库
    Add-Type -AssemblyName System.IO.Compression.FileSystem

    # 日志记录函数
    $LogFile = Join-Path $OutputDir "run.log"
    function Write-Log {
        param([string]$Message, [string]$Level = 'INFO')
        $ts = Get-Date -Format 'yyyy-MM-dd HH:mm:ss'
        $line = "[$ts][$Level] $Message"
        $line | Tee-Object -FilePath $LogFile -Append
    }

    Write-Log "脚本启动:IIS日志筛选与统计"
    Write-Log "参数:Dirs=$($LogDirectories -join ';'), OutputRoot=$OutputRoot, Window=${TimeWindowHours}h, AssumeUtc=$AssumeUtc, Thresholds: sc-status>=$StatusThreshold OR time-taken>=$TimeTakenMsThreshold ms"

    # 计算时间窗口
    if ($AssumeUtc) {
        $windowEndUtc   = (Get-Date).ToUniversalTime()
        $windowStartUtc = $windowEndUtc.AddHours(-$TimeWindowHours)
    } else {
        $windowEndLocal   = (Get-Date)
        $windowStartLocal = $windowEndLocal.AddHours(-$TimeWindowHours)
    }

    # 服务器名
    $ServerName = $env:COMPUTERNAME

    # 临时解压目录
    $SessionTempDir = Join-Path $TempExtractRoot $dateTag
    if (-not (Test-Path $SessionTempDir)) { New-Item -ItemType Directory -Path $SessionTempDir | Out-Null }

    # 读文件的StreamReader(自动BOM检测,失败时用默认编码)
    function New-StreamReader {
        param([string]$Path)
        $fs = [System.IO.File]::Open($Path, [System.IO.FileMode]::Open, [System.IO.FileAccess]::Read, [System.IO.FileShare]::ReadWrite)
        try {
            # 首选UTF-8 + BOM检测
            $enc = [System.Text.Encoding]::GetEncoding($DefaultEncodingName)
        } catch {
            $enc = [System.Text.Encoding]::UTF8
        }
        return New-Object System.IO.StreamReader($fs, $enc, $true, 4096, $false)
    }

    # 解压.zip/.gz返回.log文件路径列表
    function Expand-LogArchive {
        param([string]$ArchivePath, [string]$DestRoot)
        $ext = [System.IO.Path]::GetExtension($ArchivePath).ToLowerInvariant()
        $result = @()
        if ($ext -eq '.zip') {
            $dest = Join-Path $DestRoot ([IO.Path]::GetFileNameWithoutExtension($ArchivePath))
            if (-not (Test-Path $dest)) { New-Item -ItemType Directory -Path $dest | Out-Null }
            [System.IO.Compression.ZipFile]::ExtractToDirectory($ArchivePath, $dest, $true)
            $result += Get-ChildItem -LiteralPath $dest -Recurse -File -Include *.log | Select-Object -ExpandProperty FullName
        } elseif ($ext -eq '.gz' -or $ext -eq '.gzip') {
            $baseName = [IO.Path]::GetFileNameWithoutExtension($ArchivePath)
            $dest = Join-Path $DestRoot $baseName
            if (-not (Test-Path $dest)) { New-Item -ItemType Directory -Path $dest | Out-Null }
            $outFile = Join-Path $dest ($baseName -replace '\.log$','' + '.log')
            try {
                $inStream = [System.IO.File]::OpenRead($ArchivePath)
                try {
                    $gzip = New-Object System.IO.Compression.GzipStream($inStream, [IO.Compression.CompressionMode]::Decompress)
                    $outStream = [System.IO.File]::Create($outFile)
                    try {
                        $gzip.CopyTo($outStream)
                    } finally {
                        $outStream.Close()
                        $outStream.Dispose()
                        $gzip.Close()
                        $gzip.Dispose()
                    }
                } finally {
                    $inStream.Close()
                    $inStream.Dispose()
                }
                if (Test-Path $outFile) { $result += $outFile }
            } catch {
                Write-Log "解压GZ失败:$ArchivePath => $($_.Exception.Message)" "WARN"
            }
        }
        return $result
    }

    # 解析W3C日志文件,返回满足时间窗口与条件的明细项;同时更新聚合
    $Aggregations = @{}  # key: "uri|status|ip" => [hashtable] Count, SumTime, MaxTime
    $Details = New-Object System.Collections.Generic.List[object]
    $TotalLines = 0
    $ScannedFiles = 0
    $MatchedLines = 0
    $FilesProcessed = @()

    function Parse-W3C-File {
        param(
            [string]$FilePath,
            [bool]$AssumeUtc
        )
        $ScannedFiles++
        $fieldsMap = @{}
        $hasFields = $false
        $lineNo = 0

        try {
            $reader = New-StreamReader -Path $FilePath
        } catch {
            Write-Log "打开文件失败(跳过):$FilePath => $($_.Exception.Message)" "WARN"
            return
        }

        try {
            while (-not $reader.EndOfStream) {
                $line = $reader.ReadLine()
                $lineNo++

                if ([string]::IsNullOrWhiteSpace($line)) { continue }
                if ($line.StartsWith('#')) {
                    if ($line.StartsWith('#Fields:', [System.StringComparison]::OrdinalIgnoreCase)) {
                        $hasFields = $true
                        $fieldsMap.Clear()
                        # 例:#Fields: date time s-sitename s-computername s-ip cs-method ...
                        $fieldsLine = $line.Substring(8).Trim()
                        $fields = $fieldsLine -split '\s+'
                        for ($i=0; $i -lt $fields.Length; $i++) {
                            $fieldsMap[$fields[$i]] = $i
                        }
                    }
                    continue
                }

                if (-not $hasFields) {
                    # 未出现#Fields:无法解析,跳过
                    continue
                }

                $parts = $line -split '\s+'
                # 安全检查
                if ($parts.Count -lt $fieldsMap.Count) { continue }

                # 读取关键字段(不存在则为缺省)
                $get = {
                    param($name)
                    if ($fieldsMap.ContainsKey($name)) {
                        $idx = $fieldsMap[$name]
                        if ($idx -lt $parts.Count) { return $parts[$idx] }
                    }
                    return $null
                }

                $dateStr = & $get 'date'
                $timeStr = & $get 'time'
                if ([string]::IsNullOrEmpty($dateStr) -or [string]::IsNullOrEmpty($timeStr)) { continue }

                # 解析时间
                try {
                    $style = if ($AssumeUtc) { [System.Globalization.DateTimeStyles]::AssumeUniversal } else { [System.Globalization.DateTimeStyles]::None }
                    $dt = [datetime]::ParseExact("$dateStr $timeStr", 'yyyy-MM-dd HH:mm:ss', [System.Globalization.CultureInfo]::InvariantCulture, $style)
                    if ($AssumeUtc -and $dt.Kind -ne [DateTimeKind]::Utc) {
                        $dt = [datetime]::SpecifyKind($dt, [DateTimeKind]::Utc)
                    }
                } catch {
                    continue
                }

                # 时间窗口判断
                $inWindow = $false
                if ($AssumeUtc) {
                    if ($dt -ge $windowStartUtc -and $dt -le $windowEndUtc) { $inWindow = $true }
                } else {
                    # 解析为本地时间
                    if ($dt -ge $windowStartLocal -and $dt -le $windowEndLocal) { $inWindow = $true }
                }
                if (-not $inWindow) { continue }

                $TotalLines++

                # 拉取其他字段
                $uri     = (& $get 'cs-uri-stem'); if ([string]::IsNullOrEmpty($uri)) { $uri = '-' }
                $cip     = (& $get 'c-ip');        if ([string]::IsNullOrEmpty($cip)) { $cip = '-' }
                $status  = (& $get 'sc-status');   if ([string]::IsNullOrEmpty($status)) { $status = '0' }
                $tt      = (& $get 'time-taken');  if ([string]::IsNullOrEmpty($tt)) { $tt = '0' }

                # 转型
                $statusInt = 0
                [void][int]::TryParse($status, [ref]$statusInt) | Out-Null
                $ttInt = 0
                [void][int]::TryParse($tt, [ref]$ttInt) | Out-Null

                # 过滤条件
                if (($statusInt -ge $StatusThreshold) -or ($ttInt -ge $TimeTakenMsThreshold)) {
                    $MatchedLines++

                    $obj = [pscustomobject]@{
                        server           = $ServerName
                        file             = $FilePath
                        timestamp        = if ($AssumeUtc) { $dt.ToLocalTime().ToString('yyyy-MM-dd HH:mm:ss') + ' (Local)' } else { $dt.ToString('yyyy-MM-dd HH:mm:ss') }
                        date             = $dateStr
                        time             = $timeStr
                        'cs-uri-stem'    = $uri
                        'c-ip'           = $cip
                        'sc-status'      = $statusInt
                        'time-taken-ms'  = $ttInt
                        'cs-method'      = (& $get 'cs-method')
                        'cs-host'        = (& $get 'cs-host')
                        'sc-substatus'   = (& $get 'sc-substatus')
                        'sc-win32-status'= (& $get 'sc-win32-status')
                        'cs(Referer)'    = (& $get 'cs(Referer)')
                        'cs(User-Agent)' = (& $get 'cs(User-Agent)')
                    }
                    $Details.Add($obj) | Out-Null

                    # 聚合
                    $key = "$uri|$statusInt|$cip"
                    if (-not $Aggregations.ContainsKey($key)) {
                        $Aggregations[$key] = [ordered]@{
                            'cs-uri-stem'   = $uri
                            'sc-status'     = $statusInt
                            'c-ip'          = $cip
                            Count           = 0
                            SumTime         = 0
                            MaxTime         = 0
                        }
                    }
                    $agg = $Aggregations[$key]
                    $agg['Count']++
                    $agg['SumTime'] += $ttInt
                    if ($ttInt -gt $agg['MaxTime']) { $agg['MaxTime'] = $ttInt }
                }
            }
        } catch {
            Write-Log "解析文件异常:$FilePath => $($_.Exception.Message)" "WARN"
        } finally {
            $reader.Close()
            $reader.Dispose()
        }
    }
}

process {
    # 收集候选文件(优化:只看近LookbackDaysForFiles修改过的)
    $since = (Get-Date).AddDays(-$LookbackDaysForFiles)
    $candidateFiles = @()

    foreach ($dir in $LogDirectories) {
        if (-not (Test-Path $dir)) {
            Write-Log "日志目录不存在(跳过):$dir" "WARN"
            continue
        }
        # 原始log
        $candidateFiles += Get-ChildItem -LiteralPath $dir -File -Include *.log -ErrorAction SilentlyContinue | Where-Object { $_.LastWriteTime -ge $since }

        # 压缩包
        $archives = Get-ChildItem -LiteralPath $dir -File -Include *.zip, *.gz, *.gzip -ErrorAction SilentlyContinue | Where-Object { $_.LastWriteTime -ge $since }
        foreach ($a in $archives) {
            try {
                $expanded = Expand-LogArchive -ArchivePath $a.FullName -DestRoot $SessionTempDir
                if ($expanded.Count -gt 0) { $candidateFiles += $expanded }
                Write-Log "解压:$($a.FullName) => $($expanded.Count) 个.log"
            } catch {
                Write-Log "解压失败:$($a.FullName) => $($_.Exception.Message)" "WARN"
            }
        }
    }

    $candidateFiles = $candidateFiles | Select-Object -Unique
    Write-Log "候选日志文件数:$($candidateFiles.Count)"

    foreach ($f in $candidateFiles) {
        try {
            Parse-W3C-File -FilePath $f -AssumeUtc:$AssumeUtc
            $FilesProcessed += $f
        } catch {
            Write-Log "文件处理异常:$f => $($_.Exception.Message)" "WARN"
        }
    }

    # 生成结果
    $detailCsv = Join-Path $OutputDir "details_$($todayTag).csv"
    $summaryCsv = Join-Path $OutputDir "summary_top$TopN`_$($todayTag).csv"
    $reportTxt = Join-Path $OutputDir "report_$($todayTag).txt"

    # 明细导出
    $Details | Export-Csv -NoTypeInformation -Path $detailCsv -Encoding UTF8

    # 聚合并TopN
    $Summary = foreach ($kv in $Aggregations.GetEnumerator()) {
        $val = $kv.Value
        [pscustomobject]@{
            'cs-uri-stem'       = $val['cs-uri-stem']
            'sc-status'         = $val['sc-status']
            'c-ip'              = $val['c-ip']
            Count               = $val['Count']
            AvgTimeTaken_ms     = [math]::Round(($val['SumTime'] / [Math]::Max(1,$val['Count'])),2)
            MaxTimeTaken_ms     = $val['MaxTime']
        }
    }

    $TopSummary = $Summary | Sort-Object -Property @{Expression='Count';Descending=$true}, @{Expression='AvgTimeTaken_ms';Descending=$true} | Select-Object -First $TopN
    $TopSummary | Export-Csv -NoTypeInformation -Path $summaryCsv -Encoding UTF8

    # 生成报告
    $totalFiles = $FilesProcessed.Count
    $totalCandidates = $candidateFiles.Count
    $distinctUris = ($Details | Select-Object -ExpandProperty 'cs-uri-stem' -Unique | Measure-Object).Count
    $distinctIPs = ($Details | Select-Object -ExpandProperty 'c-ip' -Unique | Measure-Object).Count

    $timeWindowDesc = if ($AssumeUtc) {
        "UTC: $($windowStartUtc.ToString('yyyy-MM-dd HH:mm:ss')) ~ $($windowEndUtc.ToString('yyyy-MM-dd HH:mm:ss'))(以本地显示:近${TimeWindowHours}小时)"
    } else {
        "Local: $($windowStartLocal.ToString('yyyy-MM-dd HH:mm:ss')) ~ $($windowEndLocal.ToString('yyyy-MM-dd HH:mm:ss'))(近${TimeWindowHours}小时)"
    }

    $topPreview = ($TopSummary | Select-Object -First ([Math]::Min(10,$TopSummary.Count)))
    $topLines = @("Top项预览(最多10条):")
    foreach ($t in $topPreview) {
        $topLines += (" - {0} | sc-status={1} | c-ip={2} | Count={3} | Avg={4}ms | Max={5}ms" -f $t.'cs-uri-stem', $t.'sc-status', $t.'c-ip', $t.Count, $t.AvgTimeTaken_ms, $t.MaxTimeTaken_ms)
    }

    @"
IIS 日志分析报告
================
服务器:$ServerName
时间窗口:$timeWindowDesc
日志目录:$(($LogDirectories -join '; '))
候选文件数:$totalCandidates(实际处理:$totalFiles)
扫描总行数:$TotalLines
匹配行数(sc-status>=$StatusThreshold 或 time-taken>=$TimeTakenMsThreshold):$MatchedLines
Distinct URI:$distinctUris
Distinct IP:$distinctIPs

导出文件:
 - 明细CSV:$detailCsv
 - Top$TopN 摘要CSV:$summaryCsv

$($topLines -join "`r`n")
"@ | Set-Content -Path $reportTxt -Encoding UTF8

    Write-Log "明细已导出:$detailCsv"
    Write-Log "摘要已导出:$summaryCsv"
    Write-Log "报告已生成:$reportTxt"

    # 打包ZIP
    $zipPath = "$OutputDir.zip"
    if (Test-Path $zipPath) { Remove-Item -LiteralPath $zipPath -Force }
    Compress-Archive -Path (Join-Path $OutputDir '*') -DestinationPath $zipPath -Force
    Write-Log "结果已打包:$zipPath"

    # 清理历史(仅OutputRoot下)
    if ($EnforceRetention -and (Test-Path $OutputRoot)) {
        $deadline = (Get-Date).AddDays(-$RetentionDays)
        $toDelete = @()
        $toDelete += Get-ChildItem -LiteralPath $OutputRoot -Directory -ErrorAction SilentlyContinue | Where-Object { $_.LastWriteTime -lt $deadline }
        $toDelete += Get-ChildItem -LiteralPath $OutputRoot -File -Filter '*.zip' -ErrorAction SilentlyContinue | Where-Object { $_.LastWriteTime -lt $deadline }
        foreach ($item in $toDelete) {
            try {
                Remove-Item -LiteralPath $item.FullName -Recurse -Force -ErrorAction Stop
                Write-Log "已清理历史产物:$($item.FullName)"
            } catch {
                Write-Log "清理失败:$($item.FullName) => $($_.Exception.Message)" "WARN"
            }
        }
    }

    Write-Log "任务完成。"
}

end {
    # 清理会话临时解压目录(不影响原日志)
    try {
        if (Test-Path $SessionTempDir) {
            Remove-Item -LiteralPath $SessionTempDir -Recurse -Force -ErrorAction Stop
            Write-Log "会话临时目录已清理:$SessionTempDir"
        }
    } catch {
        Write-Log "清理临时目录失败(可忽略):$($_.Exception.Message)" "WARN"
    }
}

参数说明

  • LogDirectories

    • 含义:IIS W3C日志目录列表
    • 默认值:C:\inetpub\logs\LogFiles\W3SVC1;C:\inetpub\logs\LogFiles\W3SVC2
    • 取值范围:存在且可读的目录路径数组
  • OutputRoot

    • 含义:结果输出根目录(会按日期创建子目录)
    • 默认值:D:\ops\log_analysis
    • 取值范围:可写目录
  • TimeWindowHours

    • 含义:分析近N小时(默认24小时)
    • 默认值:24
    • 取值范围:正整数
  • StatusThreshold

    • 含义:HTTP状态码阈值(含等于,默认500)
    • 默认值:500
    • 取值范围:100-599之间整数
  • TimeTakenMsThreshold

    • 含义:耗时阈值(ms,含等于,默认3000ms)
    • 默认值:3000
    • 取值范围:非负整数
  • TopN

    • 含义:摘要Top条目数量
    • 默认值:20
    • 取值范围:正整数
  • AssumeUtc

    • 含义:是否将日志时间视为UTC(W3C默认UTC);若IIS配置为本地时间,请设为$false
    • 默认值:$true
    • 取值范围:$true/$false
  • TempExtractRoot

    • 含义:压缩日志的临时解压根目录
    • 默认值:%TEMP%\iis_log_extract
    • 取值范围:可写目录
  • LookbackDaysForFiles

    • 含义:扫描文件的修改时间回溯天数,用于减少候选文件数量(最终仍按日期字段过滤)
    • 默认值:3
    • 取值范围:正整数
  • DefaultEncodingName

    • 含义:当文件无BOM时采用的默认编码名
    • 默认值:utf-8
    • 取值范围:任意系统支持编码(如 windows-1252、gb18030)
  • RetentionDays

    • 含义:结果保留天数
    • 默认值:7
    • 取值范围:正整数
  • EnforceRetention

    • 含义:是否执行保留策略清理(仅限OutputRoot内产物)
    • 默认值:$true
    • 取值范围:$true/$false

使用示例

  1. 立即按默认参数执行(适用于 srv-gw01/srv-gw02 本机)
  • 在提升或普通PowerShell中执行(需对日志有读权限,对输出目录有写权限) powershell -ExecutionPolicy Bypass -File .\IIS_Log_Analyzer.ps1
  1. 指定本地时间模式(若IIS日志配置使用本地时间) powershell -ExecutionPolicy Bypass -File .\IIS_Log_Analyzer.ps1 -AssumeUtc:$false -TimeWindowHours 24

  2. 每小时计划任务示例(从当前脚本路径每小时运行一次) $action = New-ScheduledTaskAction -Execute 'powershell.exe' -Argument '-NoProfile -ExecutionPolicy Bypass -File "D:\ops\scripts\IIS_Log_Analyzer.ps1"' $trigger = New-ScheduledTaskTrigger -Once -At (Get-Date).AddMinutes(2) -RepetitionInterval (New-TimeSpan -Hours 1) -RepetitionDuration ([TimeSpan]::MaxValue) Register-ScheduledTask -TaskName 'IIS_Log_Analyzer_Hourly' -Action $action -Trigger $trigger -Description 'IIS日志分析每小时执行' -User 'SYSTEM' -RunLevel Limited

注:若需自定义输出目录或日志目录,可在Argument中加入 -OutputRoot 与 -LogDirectories 参数。

注意事项

  • 执行前的准备

    • 确认日志目录路径正确,当前账户对日志有读取权限,对输出目录有写入权限。
    • 如IIS日志时间使用本地时间,务必将参数 -AssumeUtc 设为 $false,确保时间窗口正确。
    • 若历史日志被压缩为.zip/.gz,请确保磁盘有足够空间用于临时解压(默认在%TEMP%下)。
  • 安全与合规

    • 脚本仅以只读方式访问日志文件,打开文件使用共享读取,不会中断IIS服务。
    • 不在脚本中硬编码任何敏感信息;仅在OutputRoot目录下进行ZIP打包与清理。
  • 兼容性与编码

    • 通过StreamReader启用BOM自动检测,且可通过 -DefaultEncodingName 指定无BOM时默认编码(默认utf-8)。常见IIS W3C日志为ASCII/UTF-8,通常无需更改。
    • 对压缩日志(.zip/.gz/.gzip)自动解压至临时目录后处理。
  • 性能影响与资源消耗

    • 文件扫描默认回溯3天修改时间以减少候选文件;实际过滤仍以日志内date/time字段为准。
    • 脚本为流式读取逐行解析,内存主要消耗在匹配明细的集合与Top聚合。若近24小时日志量巨大,可考虑缩小时间窗口或先按天分割处理。
    • 临时解压会短时占用磁盘空间,脚本结束后自动清理会话临时目录。
  • 结果保留与清理

    • 仅清理 OutputRoot 下超过 RetentionDays 的目录与zip文件,不触碰原始日志目录。
    • 如需保留所有历史产物,可将 -EnforceRetention 设为 $false。

如需将服务器名固定为 srv-gw01 或 srv-gw02 可在报告汇总时使用 -ServerName 自行扩展(脚本当前自动读取本机计算机名)。

脚本概述

  • 功能描述:
    • 将 macOS 构建机指定目录进行“项目级”归档备份,支持每周全量、其余日增量。
    • 备份输出为 per-project 的 .tar.gz 文件,生成并校验 sha256 摘要。
    • 支持保留最近 7 天快照并自动清理超期。
    • 支持按日期恢复,支持 dry-run 模式与权限保留;增量恢复包含删除变更(按 deletions 列表)。
    • 备份前后支持执行构建 Agent 停止/恢复钩子。
    • 通过环境变量读取 SMB 凭证,自动挂载/卸载远端共享。
    • 提供失败重试(最多重试 2 次)、日志记录(/var/log/op_backup.log),可安装为 launchd 定时任务(每日 02:30)。
  • 适用环境:
    • 操作系统:macOS 12+(Monterey 及以上)
    • 需要 Python 3.8+ 环境
    • SMB 共享可达且可挂载
  • 执行权限:
    • 建议以 root/sudo 运行(写 /var/log、挂载/卸载、权限还原等需要高权限)

完整代码

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
op_backup.py - macOS 构建机增量/全量项目级备份与恢复工具

主要特性:
- 每周一次全量(自动判断距上次全量 >= 6 天或 --force-full),其余为增量
- 备份对象:/Users/build/Projects 下每个项目目录(一级子目录)与 /Users/build/.ssh
- 排除:**/DerivedData/**, **/Library/Caches/**, **/*.tmp, **/node_modules/**
- 备份输出:per-project tar.gz;写入 sha256 并立即校验
- 快照保留:最近 N(7) 天
- 恢复:指定日期(YYYYMMDD);dry-run;可保留权限与时间戳;处理删除集
- SMB 凭证:env BACKUP_USER/BACKUP_PASS;通过 AppleScript 安全地从 stdin 传参挂载
- 钩子:备份前停止 Agent,完成后恢复(支持 --agent-label 或自定义命令)
- 日志:/var/log/op_backup.log;失败重试 2 次(总 3 次尝试)
- 定时:支持安装 launchd plist(每日 02:30)

安全说明:
- 不在脚本中硬编码任何敏感信息;凭证从环境变量或外部 env 文件加载
- 删除与清理仅在目标备份根目录下进行,路径校验防误删
"""

import argparse
import datetime as dt
import fnmatch
import hashlib
import json
import logging
import os
import pathlib
import platform
import shutil
import socket
import subprocess
import sys
import tarfile
import tempfile
import time
from typing import Dict, List, Tuple, Iterable, Optional

# =========================
# 全局默认配置(可通过参数覆盖)
# =========================

DEFAULT_SOURCES = [
    "/Users/build/Projects",
    "/Users/build/.ssh",
]

DEFAULT_EXCLUDES = [
    "**/DerivedData/**",
    "**/Library/Caches/**",
    "**/*.tmp",
    "**/node_modules/**",
]

DEFAULT_SMB_URL = "smb://10.0.30.20/backup/build"
DEFAULT_MOUNT_POINT = "/Volumes/build_backup"
DEFAULT_LOG_PATH = "/var/log/op_backup.log"
DEFAULT_RETENTION_DAYS = 7
DEFAULT_FULL_INTERVAL_DAYS = 7  # 每周一次全量(≥6天未全量则全量)
DEFAULT_RETRIES = 2  # 失败重试次数
SCRIPT_LABEL = "com.ops.backup"

# =========================
# 日志
# =========================

def setup_logger(log_path: str) -> logging.Logger:
    logger = logging.getLogger("op_backup")
    logger.setLevel(logging.INFO)
    fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
    # 文件日志
    try:
        fh = logging.FileHandler(log_path)
        fh.setFormatter(fmt)
        fh.setLevel(logging.INFO)
        logger.addHandler(fh)
    except Exception:
        # 回退到 stdout
        sh = logging.StreamHandler(sys.stdout)
        sh.setFormatter(fmt)
        logger.addHandler(sh)
        logger.warning("无法写入日志文件 %s,已回退到标准输出", log_path)
    return logger


logger = setup_logger(DEFAULT_LOG_PATH)

# =========================
# 工具函数
# =========================

def retry(max_retries: int, pause_seconds: float = 3.0):
    def deco(func):
        def wrapper(*args, **kwargs):
            attempt = 0
            last_exc = None
            while attempt <= max_retries:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exc = e
                    attempt += 1
                    if attempt > max_retries:
                        logger.error("函数 %s 最终失败:%s", func.__name__, e)
                        raise
                    logger.warning("函数 %s 失败(第 %d 次),%ss 后重试... 错误:%s",
                                   func.__name__, attempt, pause_seconds, e)
                    time.sleep(pause_seconds)
        return wrapper
    return deco


def load_env_file(env_file: Optional[str]):
    if not env_file:
        return
    p = pathlib.Path(env_file)
    if not p.exists():
        raise FileNotFoundError(f"env 文件不存在:{env_file}")
    for line in p.read_text(encoding="utf-8").splitlines():
        line = line.strip()
        if not line or line.startswith("#") or "=" not in line:
            continue
        k, v = line.split("=", 1)
        os.environ[k.strip()] = v.strip()


def ensure_dir(path: pathlib.Path):
    path.mkdir(parents=True, exist_ok=True)


def sha256_file(path: pathlib.Path, chunk: int = 2 * 1024 * 1024) -> str:
    h = hashlib.sha256()
    with path.open("rb") as f:
        while True:
            b = f.read(chunk)
            if not b:
                break
            h.update(b)
    return h.hexdigest()


def is_mounted(mount_point: str) -> bool:
    try:
        out = subprocess.check_output(["mount"], text=True)
        for line in out.splitlines():
            if f" on {mount_point} " in line and "smbfs" in line:
                return True
    except Exception:
        pass
    return False


def safe_join(base: pathlib.Path, rel: str) -> pathlib.Path:
    # 防止 路径穿越
    final = (base / rel).resolve()
    if not str(final).startswith(str(base.resolve())):
        raise ValueError(f"越界路径:{rel}")
    return final


def date_str(d: Optional[dt.date] = None) -> str:
    d = d or dt.date.today()
    return d.strftime("%Y%m%d")


def list_subdirs(path: pathlib.Path) -> List[pathlib.Path]:
    return [p for p in path.iterdir() if p.is_dir()]


def parse_date_dir_name(name: str) -> Optional[dt.date]:
    try:
        return dt.datetime.strptime(name, "%Y%m%d").date()
    except Exception:
        return None


def now_local() -> dt.datetime:
    return dt.datetime.now()


def hostname() -> str:
    try:
        return socket.gethostname()
    except Exception:
        return platform.node() or "unknown-host"


# =========================
# 排除规则
# =========================

def normalize_rel(path: pathlib.Path, base: pathlib.Path) -> str:
    rel = path.relative_to(base).as_posix()
    return rel if rel != "." else ""


def should_exclude(rel_path: str, exclude_patterns: List[str]) -> bool:
    # rel_path 使用 posix 风格
    if rel_path is None:
        return False
    rel = rel_path if rel_path else ""
    # 目录以 / 结尾,以便 **/dir/** 命中
    candidates = {rel, rel + "/" if not rel.endswith("/") else rel}
    for pat in exclude_patterns:
        # 统一使用 posix 风格
        if any(fnmatch.fnmatch(c, pat) for c in candidates):
            return True
    return False


# =========================
# SMB 挂载/卸载
# =========================

class SMBMounter:
    def __init__(self, smb_url: str, mount_point: str):
        self.smb_url = smb_url
        self.mount_point = mount_point

    def mount(self, user: Optional[str], password: Optional[str]):
        mp = pathlib.Path(self.mount_point)
        ensure_dir(mp)
        if is_mounted(self.mount_point):
            logger.info("SMB 已挂载:%s", self.mount_point)
            return
        if not user or not password:
            raise RuntimeError("挂载 SMB 需要 BACKUP_USER/BACKUP_PASS 环境变量或 --env-file 提供")
        # 通过 AppleScript 从 stdin 传参,避免密码出现在命令行
        applescript = f'''
            on run
                mount volume "{self.smb_url}" as user name "{user}" with password "{password}"
            end run
        '''
        try:
            logger.info("尝试挂载 SMB:%s -> %s", self.smb_url, self.mount_point)
            subprocess.check_call(["osascript", "-"], input=applescript.encode("utf-8"))
            # 等待 Finder 挂载到 /Volumes/<ShareName>,然后将其绑定/链接到指定 mount_point
            time.sleep(2.0)
        except subprocess.CalledProcessError as e:
            raise RuntimeError(f"SMB 挂载失败:{e}")

        # macOS Finder 通常挂载到 /Volumes/<ShareName>,但我们需要稳定路径:
        # 如果用户指定了自定义 mount_point,尝试将其软链接到 Finder 挂载点。
        if not is_mounted(self.mount_point):
            # 查找最新 smbfs 挂载点
            out = subprocess.check_output(["mount"], text=True)
            mounted = None
            for line in out.splitlines():
                if "smbfs on /Volumes/" in line:
                    mounted = line.split(" on ")[1].split(" (")[0].strip()
                    break
            if mounted:
                try:
                    if mp.exists() and not mp.is_dir():
                        raise RuntimeError(f"mount_point 不是目录:{mp}")
                    if mp.exists():
                        # 若已有同名目录,清空后重建软链接
                        if mp.is_symlink() or mp.is_dir():
                            try:
                                os.unlink(mp) if mp.is_symlink() else None
                            except Exception:
                                pass
                    ensure_dir(mp.parent)
                    if not mp.exists():
                        os.symlink(mounted, mp)
                    logger.info("SMB 已通过符号链接映射到 %s -> %s", mp, mounted)
                except Exception as e:
                    logger.warning("创建符号链接失败:%s", e)

        if not is_mounted(self.mount_point):
            logger.warning("无法确认 %s 已挂载,但 Finder 可能已挂载成功。", self.mount_point)

    def unmount(self):
        if not is_mounted(self.mount_point):
            return
        try:
            logger.info("卸载 SMB:%s", self.mount_point)
            subprocess.check_call(["diskutil", "unmount", "force", self.mount_point])
        except Exception as e:
            logger.warning("卸载失败(忽略):%s", e)


# =========================
# 备份/恢复核心
# =========================

class BackupManager:
    def __init__(
        self,
        sources: List[str],
        exclude_patterns: List[str],
        mount_point: str,
        smb_url: str,
        retention_days: int,
        full_interval_days: int,
        agent_label: Optional[str] = None,
        pre_stop_cmd: Optional[str] = None,
        post_start_cmd: Optional[str] = None,
        host_name: Optional[str] = None,
    ):
        self.sources = [pathlib.Path(s).resolve() for s in sources]
        self.exclude_patterns = exclude_patterns
        self.mount_point = pathlib.Path(mount_point)
        self.smb_url = smb_url
        self.retention_days = retention_days
        self.full_interval_days = full_interval_days
        self.agent_label = agent_label
        self.pre_stop_cmd = pre_stop_cmd
        self.post_start_cmd = post_start_cmd
        self.host_name = host_name or hostname()

        # 备份目录结构:
        # /Volumes/build_backup/<host>/
        #   snapshots/YYYYMMDD/
        #      TYPE: FULL | INCR
        #      <project>-<type>-YYYYMMDD.tar.gz
        #      <project>-<type>-YYYYMMDD.tar.gz.sha256
        #      deletions-<project>-YYYYMMDD.txt (增量时可能存在)
        #   meta/manifests/<project>.json  (最新清单)
        self.base_dir = self.mount_point / self.host_name
        self.snapshots_dir = self.base_dir / "snapshots"
        self.meta_dir = self.base_dir / "meta" / "manifests"

    def _project_list(self) -> List[Tuple[str, pathlib.Path]]:
        projects: List[Tuple[str, pathlib.Path]] = []
        for src in self.sources:
            if not src.exists():
                logger.warning("源目录不存在:%s(跳过)", src)
                continue
            if src.name == ".ssh" and src.is_dir():
                projects.append(("ssh", src))
                continue
            if src.name == "Projects" and src.is_dir():
                for child in sorted(src.iterdir()):
                    if child.is_dir():
                        projects.append((child.name, child))
            else:
                # 其它单独目录作为一个项目
                projects.append((src.name, src))
        return projects

    def _load_manifest(self, project: str) -> Dict:
        p = self.meta_dir / f"{project}.json"
        if not p.exists():
            return {"project": project, "files": {}, "timestamp": ""}
        try:
            return json.loads(p.read_text(encoding="utf-8"))
        except Exception:
            logger.warning("读取 manifest 失败:%s(重置为空)", p)
            return {"project": project, "files": {}, "timestamp": ""}

    def _save_manifest(self, project: str, manifest: Dict):
        ensure_dir(self.meta_dir)
        p = self.meta_dir / f"{project}.json"
        p.write_text(json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8")

    def _walk_files(self, base: pathlib.Path) -> Iterable[Tuple[str, os.stat_result]]:
        # 遍历文件并应用排除规则;返回 (rel_path, stat)
        for root, dirs, files in os.walk(base, followlinks=False):
            root_p = pathlib.Path(root)
            # 目录级排除(修改 dirs 就能剪枝)
            keep_dirs = []
            for d in dirs:
                rel = normalize_rel(root_p / d, base)
                if should_exclude(rel + "/", self.exclude_patterns):
                    continue
                keep_dirs.append(d)
            dirs[:] = keep_dirs
            # 文件
            for f in files:
                p = root_p / f
                rel = normalize_rel(p, base)
                if should_exclude(rel, self.exclude_patterns):
                    continue
                try:
                    st = p.lstat()
                except FileNotFoundError:
                    continue
                yield rel, st

    def _scan_current_state(self, base: pathlib.Path) -> Dict[str, Dict]:
        state: Dict[str, Dict] = {}
        for rel, st in self._walk_files(base):
            state[rel] = {"mtime": st.st_mtime, "size": st.st_size}
        return state

    def _compute_delta(
        self, prev: Dict[str, Dict], current: Dict[str, Dict]
    ) -> Tuple[List[str], List[str]]:
        changed: List[str] = []
        for rel, meta in current.items():
            pv = prev.get(rel)
            if not pv or pv.get("mtime") != meta["mtime"] or pv.get("size") != meta["size"]:
                changed.append(rel)
        deletions: List[str] = []
        for rel in prev:
            if rel not in current:
                # 只有当此前文件不属于排除时才记录删除(避免因排除导致虚假删除)
                if not should_exclude(rel, self.exclude_patterns):
                    deletions.append(rel)
        return sorted(changed), sorted(deletions)

    def _need_full(self) -> bool:
        # 若最近一次 FULL 距今 >= full_interval_days-1 则做全量;否则增量
        if not self.snapshots_dir.exists():
            return True
        latest_full_date: Optional[dt.date] = None
        for d in list_subdirs(self.snapshots_dir):
            ds = parse_date_dir_name(d.name)
            if not ds:
                continue
            type_file = d / "TYPE"
            if type_file.exists():
                try:
                    t = type_file.read_text(encoding="utf-8").strip().upper()
                    if t == "FULL":
                        if (latest_full_date is None) or (ds > latest_full_date):
                            latest_full_date = ds
                except Exception:
                    continue
        if latest_full_date is None:
            return True
        delta_days = (dt.date.today() - latest_full_date).days
        return delta_days >= (self.full_interval_days - 1)

    def _snapshot_dir(self, day: dt.date) -> pathlib.Path:
        return self.snapshots_dir / day.strftime("%Y%m%d")

    def _write_type_marker(self, snap_dir: pathlib.Path, snap_type: str):
        (snap_dir / "TYPE").write_text(snap_type.upper(), encoding="utf-8")

    def _archive_project(
        self,
        project: str,
        base_dir: pathlib.Path,
        rel_list: List[str],
        snap_dir: pathlib.Path,
        snap_type: str,
        day: dt.date,
    ) -> Tuple[pathlib.Path, pathlib.Path]:
        ensure_dir(snap_dir)
        tar_name = f"{project}-{snap_type.lower()}-{day.strftime('%Y%m%d')}.tar.gz"
        tar_path = snap_dir / tar_name
        sha_path = snap_dir / f"{tar_name}.sha256"

        logger.info("打包 %s(%s,%d 个条目)-> %s", project, snap_type, len(rel_list), tar_path)

        # 创建 tar.gz
        with tarfile.open(tar_path, mode="w:gz", format=tarfile.PAX_FORMAT, compresslevel=6) as tf:
            for rel in rel_list:
                abs_path = safe_join(base_dir, rel)
                # 加入档案,保留相对路径结构
                try:
                    tf.add(abs_path, arcname=rel, recursive=False)
                except FileNotFoundError:
                    # 避免并发写导致的短暂缺失
                    logger.warning("文件消失(忽略):%s", abs_path)

        # 计算 sha256 并写入
        digest = sha256_file(tar_path)
        sha_path.write_text(f"{digest}  {tar_name}\n", encoding="utf-8")

        # 立刻重新校验一次
        digest2 = sha256_file(tar_path)
        if digest != digest2:
            raise RuntimeError(f"SHA256 校验不一致:{tar_path}")

        logger.info("归档与校验完成:%s (sha256=%s)", tar_path.name, digest)
        return tar_path, sha_path

    def _write_deletions(
        self, project: str, deletions: List[str], snap_dir: pathlib.Path, day: dt.date
    ) -> Optional[pathlib.Path]:
        if not deletions:
            return None
        path = snap_dir / f"deletions-{project}-{day.strftime('%Y%m%d')}.txt"
        path.write_text("\n".join(deletions) + "\n", encoding="utf-8")
        logger.info("记录删除集:%s(%d 条)", path.name, len(deletions))
        return path

    def _run_hook_stop(self):
        if self.pre_stop_cmd:
            logger.info("执行预停止命令:%s", self.pre_stop_cmd)
            subprocess.check_call(self.pre_stop_cmd, shell=True)
        elif self.agent_label:
            # 尝试停止构建 Agent(可能需 root)
            logger.info("停止构建 Agent(launchctl stop):%s", self.agent_label)
            subprocess.call(["launchctl", "stop", self.agent_label])
        else:
            logger.info("未配置停止钩子,跳过")

    def _run_hook_start(self):
        if self.post_start_cmd:
            logger.info("执行恢复命令:%s", self.post_start_cmd)
            subprocess.check_call(self.post_start_cmd, shell=True)
        elif self.agent_label:
            logger.info("恢复构建 Agent(launchctl start):%s", self.agent_label)
            subprocess.call(["launchctl", "start", self.agent_label])
        else:
            logger.info("未配置恢复钩子,跳过")

    def _cleanup_retention(self):
        if not self.snapshots_dir.exists():
            return
        entries = list_subdirs(self.snapshots_dir)
        items: List[Tuple[dt.date, pathlib.Path]] = []
        for d in entries:
            ds = parse_date_dir_name(d.name)
            if ds:
                items.append((ds, d))
        items.sort()
        # 保留最近 retention_days 天
        keep_after = (dt.date.today() - dt.timedelta(days=self.retention_days - 1))
        removed = 0
        for ds, d in items:
            if ds < keep_after:
                logger.info("清理超期快照:%s", d)
                shutil.rmtree(d, ignore_errors=True)
                removed += 1
        if removed:
            logger.info("保留策略执行完成,删除 %d 个快照目录", removed)

    @retry(max_retries=DEFAULT_RETRIES, pause_seconds=3.0)
    def backup_once(self, force_full: bool = False):
        # 确保目标目录存在
        ensure_dir(self.snapshots_dir)
        ensure_dir(self.meta_dir)

        # 挂载 SMB
        mounter = SMBMounter(self.smb_url, str(self.mount_point))
        mounter.mount(os.environ.get("BACKUP_USER"), os.environ.get("BACKUP_PASS"))

        today = dt.date.today()
        snap_dir = self._snapshot_dir(today)
        ensure_dir(snap_dir)

        snap_type = "FULL" if force_full or self._need_full() else "INCR"
        self._write_type_marker(snap_dir, snap_type)
        logger.info("开始备份(%s):%s", snap_type, today.strftime("%Y-%m-%d"))

        # 钩子:停止 Agent
        try:
            self._run_hook_stop()
        except Exception as e:
            logger.warning("停止钩子执行失败(继续):%s", e)

        try:
            # 遍历项目
            for project, base in self._project_list():
                prev_manifest = self._load_manifest(project)
                current_state = self._scan_current_state(base)
                if snap_type == "FULL":
                    changed = sorted(current_state.keys())
                    deletions: List[str] = []
                else:
                    changed, deletions = self._compute_delta(prev_manifest.get("files", {}), current_state)

                # 即便 changed 为空,也创建一个最小 tar 以标记项目(可选)
                tar_path, sha_path = self._archive_project(project, base, changed, snap_dir, snap_type, today)
                # 写删除集(仅增量)
                self._write_deletions(project, deletions, snap_dir, today)

                # 更新 manifest(成为下一次基线)
                new_manifest = {
                    "project": project,
                    "timestamp": now_local().isoformat(),
                    "files": current_state,
                }
                self._save_manifest(project, new_manifest)
                # 快照中也保存一份项目 manifest(可选,辅助定位)
                (snap_dir / f"{project}-manifest.json").write_text(
                    json.dumps(new_manifest, ensure_ascii=False, indent=2),
                    encoding="utf-8",
                )
        finally:
            # 钩子:恢复 Agent
            try:
                self._run_hook_start()
            except Exception as e:
                logger.warning("恢复钩子执行失败:%s", e)

            # 不强制卸载,留给系统/调度使用;如需可以在外层调用 unmount

        # 保留策略清理
        self._cleanup_retention()

        logger.info("备份完成(%s):%s", snap_type, today.strftime("%Y-%m-%d"))

    def list_snapshots(self) -> List[Tuple[str, str]]:
        res: List[Tuple[str, str]] = []
        if not self.snapshots_dir.exists():
            return res
        for d in sorted(list_subdirs(self.snapshots_dir)):
            t = (d / "TYPE").read_text(encoding="utf-8").strip().upper() if (d / "TYPE").exists() else "UNKNOWN"
            res.append((d.name, t))
        return res

    def _locate_restore_chain(self, target_date: dt.date) -> List[Tuple[dt.date, pathlib.Path, str]]:
        # 找到 <= target_date 的快照链:最近的 FULL 及之后直到 target_date 的所有快照
        items: List[Tuple[dt.date, pathlib.Path, str]] = []
        for d in list_subdirs(self.snapshots_dir):
            ds = parse_date_dir_name(d.name)
            if not ds or ds > target_date:
                continue
            t = (d / "TYPE").read_text(encoding="utf-8").strip().upper() if (d / "TYPE").exists() else "UNKNOWN"
            items.append((ds, d, t))
        items.sort()
        # 找最近的 FULL
        last_full_idx = -1
        for i in range(len(items) - 1, -1, -1):
            if items[i][2] == "FULL":
                last_full_idx = i
                break
        if last_full_idx == -1:
            raise RuntimeError("未找到可用的 FULL 快照")
        chain = items[last_full_idx:]
        return chain

    def _list_project_archives(self, snap_dir: pathlib.Path, project: str) -> List[pathlib.Path]:
        return sorted(snap_dir.glob(f"{project}-*.tar.gz"))

    def _read_deletions(self, snap_dir: pathlib.Path, project: str) -> List[str]:
        res: List[str] = []
        for p in snap_dir.glob(f"deletions-{project}-*.txt"):
            try:
                res += [line.strip() for line in p.read_text(encoding="utf-8").splitlines() if line.strip()]
            except Exception:
                continue
        return res

    def _safe_extract(self, tf: tarfile.TarFile, dest: pathlib.Path, preserve_perms: bool):
        # 安全解压,防止路径穿越
        for m in tf.getmembers():
            # 仅允许相对路径
            if m.name.startswith("/") or ".." in pathlib.PurePosixPath(m.name).parts:
                logger.warning("检测到可疑条目(忽略):%s", m.name)
                continue
            target = dest / m.name
            target_resolved = target.resolve().absolute()
            if not str(target_resolved).startswith(str(dest.resolve().absolute())):
                logger.warning("越界解压(忽略):%s", m.name)
                continue
            # 解压
            tf.extract(m, path=dest)
            if preserve_perms and hasattr(os, "chmod"):
                try:
                    if m.mode is not None:
                        os.chmod(target_resolved, m.mode)
                    # 尝试保留 mtime
                    if m.mtime is not None:
                        os.utime(target_resolved, (m.mtime, m.mtime), follow_symlinks=False)
                except Exception:
                    pass

    @retry(max_retries=DEFAULT_RETRIES, pause_seconds=3.0)
    def restore(
        self,
        target_yyyymmdd: str,
        dest_override: Optional[str],
        projects: Optional[List[str]],
        dry_run: bool,
        preserve_perms: bool = True,
    ):
        mounter = SMBMounter(self.smb_url, str(self.mount_point))
        mounter.mount(os.environ.get("BACKUP_USER"), os.environ.get("BACKUP_PASS"))

        target_date = dt.datetime.strptime(target_yyyymmdd, "%Y%m%d").date()
        chain = self._locate_restore_chain(target_date)
        logger.info("恢复目标日期:%s;包含 %d 个快照", target_yyyymmdd, len(chain))

        # 选择项目
        all_projects = dict(self._project_list())
        selected: List[Tuple[str, pathlib.Path]] = []
        if projects:
            for p in projects:
                base = all_projects.get(p)
                if not base:
                    raise RuntimeError(f"未知项目:{p}")
                selected.append((p, base))
        else:
            selected = list(all_projects.items())

        for project, base in selected:
            dest_base = pathlib.Path(dest_override).resolve() if dest_override else base
            ensure_dir(dest_base)
            logger.info("恢复项目:%s -> %s", project, dest_base)

            # 依序应用 FULL + INCR
            archives: List[pathlib.Path] = []
            deletions_accum: List[str] = []
            for ds, d, t in chain:
                for arc in self._list_project_archives(d, project):
                    archives.append(arc)
                # 收集删除集
                deletions_accum += self._read_deletions(d, project)

            if dry_run:
                logger.info("[dry-run] 将解压 %d 个归档:%s", len(archives), ", ".join(a.name for a in archives))
                if deletions_accum:
                    logger.info("[dry-run] 将删除 %d 个文件(若存在)", len(deletions_accum))
                continue

            # 解压归档
            for arc in archives:
                logger.info("解压:%s", arc)
                with tarfile.open(arc, "r:gz") as tf:
                    self._safe_extract(tf, dest_base, preserve_perms)

            # 应用删除集
            unique_deletions = sorted(set(deletions_accum))
            removed = 0
            for rel in unique_deletions:
                target = dest_base / rel
                try:
                    if target.is_file() or target.is_symlink():
                        target.unlink(missing_ok=True)
                        removed += 1
                    elif target.is_dir():
                        shutil.rmtree(target, ignore_errors=True)
                        removed += 1
                except Exception:
                    pass
            if unique_deletions:
                logger.info("应用删除集完成:删除 %d", removed)

        logger.info("恢复完成:%s", target_yyyymmdd)

    def verify(self, target_yyyymmdd: Optional[str]):
        mounter = SMBMounter(self.smb_url, str(self.mount_point))
        mounter.mount(os.environ.get("BACKUP_USER"), os.environ.get("BACKUP_PASS"))

        # 选择快照
        if target_yyyymmdd:
            snap_dir = self.snapshots_dir / target_yyyymmdd
            if not snap_dir.exists():
                raise RuntimeError(f"快照不存在:{snap_dir}")
        else:
            snapshots = sorted(list_subdirs(self.snapshots_dir))
            if not snapshots:
                raise RuntimeError("无可用快照")
            snap_dir = snapshots[-1]

        logger.info("校验快照:%s", snap_dir.name)
        for sha in snap_dir.glob("*.sha256"):
            line = sha.read_text(encoding="utf-8").strip()
            if "  " in line:
                expect, name = line.split("  ", 1)
            else:
                parts = line.split()
                expect = parts[0]
                name = parts[-1]
            arc = snap_dir / name
            if not arc.exists():
                logger.error("缺少归档:%s", arc)
                continue
            got = sha256_file(arc)
            if got != expect:
                raise RuntimeError(f"校验失败:{arc} (期望 {expect},实际 {got})")
            logger.info("OK: %s", name)
        logger.info("校验完成:%s", snap_dir.name)

    def install_schedule(self, label: str, run_time: str, env_file: Optional[str], script_path: str):
        # 生成 launchd plist(/Library/LaunchDaemons/)
        hh, mm = run_time.split(":")
        plist = f"""<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
    <key>Label</key><string>{label}</string>
    <key>ProgramArguments</key>
    <array>
        <string>{sys.executable}</string>
        <string>{script_path}</string>
        <string>backup</string>
    </array>
    <key>StartCalendarInterval</key>
    <dict>
        <key>Hour</key><integer>{int(hh)}</integer>
        <key>Minute</key><integer>{int(mm)}</integer>
    </dict>
    <key>RunAtLoad</key><false/>
    <key>StandardOutPath</key><string>{DEFAULT_LOG_PATH}</string>
    <key>StandardErrorPath</key><string>{DEFAULT_LOG_PATH}</string>
    <key>EnvironmentVariables</key>
    <dict>
        <!-- 若使用 env_file,请在外部管理并由 launchd 的 wrapper 加载;这里通常留空 -->
    </dict>
</dict>
</plist>
"""
        plist_path = f"/Library/LaunchDaemons/{label}.plist"
        pathlib.Path(plist_path).write_text(plist, encoding="utf-8")
        logger.info("已写入 launchd plist:%s", plist_path)
        # 加载
        subprocess.check_call(["launchctl", "load", "-w", plist_path])
        logger.info("已加载定时任务(每日 %s)", run_time)


# =========================
# CLI
# =========================

def build_parser() -> argparse.ArgumentParser:
    p = argparse.ArgumentParser(description="macOS 项目级增量/全量备份与恢复工具")
    sub = p.add_subparsers(dest="cmd", required=True)

    common = argparse.ArgumentParser(add_help=False)
    common.add_argument("--sources", nargs="+", default=DEFAULT_SOURCES, help="源目录(默认:/Users/build/Projects /Users/build/.ssh)")
    common.add_argument("--exclude", nargs="+", default=DEFAULT_EXCLUDES, help="排除模式(支持 ** 通配)")
    common.add_argument("--smb-url", default=DEFAULT_SMB_URL, help="SMB 目标,如 smb://10.0.30.20/backup/build")
    common.add_argument("--mount-point", default=DEFAULT_MOUNT_POINT, help="挂载点")
    common.add_argument("--retention-days", type=int, default=DEFAULT_RETENTION_DAYS, help="保留天数(默认7)")
    common.add_argument("--full-interval-days", type=int, default=DEFAULT_FULL_INTERVAL_DAYS, help="全量间隔天数(默认7)")
    common.add_argument("--agent-label", default=os.environ.get("BUILD_AGENT_LABEL"), help="构建Agent的 launchd Label(可选)")
    common.add_argument("--pre-stop-cmd", default=os.environ.get("HOOK_STOP"), help="备份前停止命令(可选)")
    common.add_argument("--post-start-cmd", default=os.environ.get("HOOK_START"), help="备份后恢复命令(可选)")
    common.add_argument("--env-file", default=os.environ.get("OP_BACKUP_ENV_FILE"), help="包含 BACKUP_USER/BACKUP_PASS 的安全 env 文件路径")
    common.add_argument("--host-name", default=None, help="覆盖主机名,用于备份路径分隔")

    p_backup = sub.add_parser("backup", parents=[common], help="执行一次备份")
    p_backup.add_argument("--force-full", action="store_true", help="强制全量备份")
    p_backup.add_argument("--no-mount", action="store_true", help="不自动挂载(假定已挂载)")

    p_restore = sub.add_parser("restore", parents=[common], help="恢复到指定日期")
    p_restore.add_argument("--date", required=True, help="目标日期 YYYYMMDD")
    p_restore.add_argument("--project", nargs="*", help="指定项目(默认全部)")
    p_restore.add_argument("--dest", default=None, help="恢复目标根目录(默认恢复到原路径)")
    p_restore.add_argument("--dry-run", action="store_true", help="仅预演,不落盘")
    p_restore.add_argument("--no-preserve-perms", action="store_true", help="不保留权限/时间戳")

    p_list = sub.add_parser("list", parents=[common], help="列出快照")
    p_verify = sub.add_parser("verify", parents=[common], help="校验快照 sha256")
    p_verify.add_argument("--date", default=None, help="目标日期 YYYYMMDD(默认最新)")

    p_sched = sub.add_parser("install-schedule", parents=[common], help="安装每日 02:30 定时任务(launchd)")
    p_sched.add_argument("--time", default="02:30", help="HH:MM,默认 02:30")
    p_sched.add_argument("--label", default=SCRIPT_LABEL, help="launchd Label")
    p_sched.add_argument("--script-path", default=os.path.abspath(sys.argv[0]), help="脚本绝对路径")

    return p


def main():
    parser = build_parser()
    args = parser.parse_args()

    # 读取 env 文件(如提供)
    if getattr(args, "env_file", None):
        load_env_file(args.env_file)

    mgr = BackupManager(
        sources=args.sources,
        exclude_patterns=args.exclude,
        mount_point=args.mount_point,
        smb_url=args.smb_url,
        retention_days=args.retention_days,
        full_interval_days=args.full_interval_days,
        agent_label=args.agent_label,
        pre_stop_cmd=args.pre_stop_cmd,
        post_start_cmd=args.post_start_cmd,
        host_name=args.host_name,
    )

    if args.cmd == "backup":
        mgr.backup_once(force_full=args.force_full)

    elif args.cmd == "restore":
        mgr.restore(
            target_yyyymmdd=args.date,
            dest_override=args.dest,
            projects=args.project,
            dry_run=args.dry_run,
            preserve_perms=not args.no_preserve_perms,
        )

    elif args.cmd == "list":
        snaps = mgr.list_snapshots()
        for d, t in snaps:
            print(f"{d}  {t}")

    elif args.cmd == "verify":
        mgr.verify(args.date)

    elif args.cmd == "install-schedule":
        mgr.install_schedule(args.label, args.time, args.env_file, args.script_path)

    else:
        parser.print_help()


if __name__ == "__main__":
    main()

参数说明

  • --sources
    • 含义:需要备份的源目录列表
    • 默认:/Users/build/Projects /Users/build/.ssh
  • --exclude
    • 含义:备份/遍历时的排除通配模式(支持 **)
    • 默认:/DerivedData/, /Library/Caches/, **/*.tmp, /node_modules/
  • --smb-url
  • --mount-point
    • 含义:本地挂载点(脚本会尝试将 Finder 挂载映射到此路径)
    • 默认:/Volumes/build_backup
  • --retention-days
    • 含义:保留最近 N 天快照
    • 默认:7
  • --full-interval-days
    • 含义:全量间隔天数,距最近全量 >= (N-1) 天则当日执行全量
    • 默认:7(约每周一次全量)
  • --agent-label
    • 含义:备份前后通过 launchctl stop/start 控制的构建 Agent 标签(可选)
    • 默认:读取环境变量 BUILD_AGENT_LABEL
  • --pre-stop-cmd / --post-start-cmd
    • 含义:自定义停止/恢复命令(可选,优先于 --agent-label)
    • 默认:读取 HOOK_STOP / HOOK_START 环境变量
  • --env-file
    • 含义:包含 BACKUP_USER/BACKUP_PASS 的外部安全 env 文件路径(避免在 plist 中明文)
    • 默认:读取 OP_BACKUP_ENV_FILE 环境变量
  • --host-name
    • 含义:覆盖用于备份路径分隔的主机名(默认取当前主机名)
  • backup 子命令
    • --force-full:强制全量一次
  • restore 子命令
    • --date:恢复目标日期(YYYYMMDD)
    • --project:指定项目名(默认恢复全部)
    • --dest:恢复目标根目录(默认恢复到原目录)
    • --dry-run:只输出将执行的操作,不实际写入
    • --no-preserve-perms:不保留文件权限与时间戳
  • verify 子命令
    • --date:校验指定日期快照(默认最新)
  • install-schedule 子命令
    • --time:每日运行时间(HH:MM)
    • --label:launchd 任务 Label
    • --script-path:脚本绝对路径

环境变量:

  • BACKUP_USER / BACKUP_PASS:SMB 认证账号与密码
  • BUILD_AGENT_LABEL:构建 Agent 的 launchd Label(可选)
  • HOOK_STOP / HOOK_START:自定义钩子命令(可选)
  • OP_BACKUP_ENV_FILE:env 文件路径(可选)

使用示例

  1. 手动执行一次备份(自动判定增量/全量)
  • 前置:确保已导出 BACKUP_USER/BACKUP_PASS 或提供 --env-file
  • 命令: sudo ./op_backup.py backup --env-file /etc/op_backup.env
  1. 强制全量备份(例如每周一手动触发)
  • 命令: sudo ./op_backup.py backup --force-full --env-file /etc/op_backup.env
  1. 恢复到指定日期,先 dry-run 查看动作,再实际恢复
  • 预演: sudo ./op_backup.py restore --date 20250115 --dry-run
  • 实际恢复(保留权限),仅恢复某个项目到临时目录: sudo ./op_backup.py restore --date 20250115 --project MyApp --dest /tmp/restore/MyApp
  1. 校验最新快照 sha256
  • 命令: sudo ./op_backup.py verify
  1. 安装每日 02:30 定时任务(launchd)
  • 写入 env 文件(权限 600): sudo sh -c 'echo "BACKUP_USER=buildsvc" > /etc/op_backup.env' sudo sh -c 'echo "BACKUP_PASS=********" >> /etc/op_backup.env' sudo chmod 600 /etc/op_backup.env
  • 安装 plist: sudo OP_BACKUP_ENV_FILE=/etc/op_backup.env ./op_backup.py install-schedule --time 02:30
  1. 配置构建 Agent 钩子(例如 Jenkins)
  • 通过 launchctl Label: sudo BUILD_AGENT_LABEL=org.jenkins-ci ./op_backup.py backup
  • 或自定义命令(优先于 Label): sudo HOOK_STOP="brew services stop jenkins" HOOK_START="brew services start jenkins" ./op_backup.py backup

注意事项

  • 执行前的准备
    • 确保 Python 3.8+ 与管理员权限(sudo)
    • 目标 SMB 共享可达,确保 BACKUP_USER/BACKUP_PASS 有写权限
    • 建议创建 /etc/op_backup.env 保存凭证(600 权限),通过 --env-file 或 OP_BACKUP_ENV_FILE 使用
    • 首次运行会创建挂载点 /Volumes/build_backup 和备份目录结构
  • 风险与防范
    • 脚本对删除操作严格限制在备份目录(清理策略)和恢复目标根目录(删除集)内,并做路径越界校验
    • SMB 挂载使用 AppleScript 从 stdin 传递密码,降低密码出现在进程列表的风险;仍建议使用受限账号
    • 恢复含“删除集”时会删除目标目录中相对路径对应文件,建议先 dry-run 或恢复到临时目录验证
    • 日志写入 /var/log/op_backup.log,需要 root;如失败将回退控制台输出
  • 性能与资源
    • 备份使用 tar.gz 压缩,CPU 占用取决于数据量与压缩等级(默认 6);可调整代码中的 compresslevel
    • sha256 校验会对归档进行二次顺序读,磁盘与网络 IO 会增加
    • 增量是通过本地状态清单(mtime+size)判定,极端情况下相同 mtime+size 的少量变更可能无法检测,建议关键文件另行校验或开启更严格策略(可拓展为记录哈希)
  • 其他
    • “每周全量”通过距离最近 FULL 的天数判定(默认 >=6 天);也可使用 --force-full 随时触发
    • 项目识别规则:/Users/build/Projects 下一级目录每个为一个项目;/Users/build/.ssh 作为独立项目“ssh”
    • 脚本恢复默认保留权限与时间戳;所有操作均记录日志,便于审计与追踪
    • 本脚本不对 Keychain 写入凭证,不持久化敏感信息,符合不硬编码敏感信息的规范

示例详情

解决的问题

让运维团队在几分钟内从“需求描述”直达“可执行脚本”,覆盖常见场景(如监控告警、日志排障、数据备份与回滚、资源调优、安全巡检),以更少的人力和更低风险完成重复与关键操作。通过标准化、可配置、带注释的脚本生成,显著缩短交付周期,降低误操作概率,提升跨系统环境(Linux/Windows)的稳定性与合规性,最终实现效率提升与成本优化,促进试用转为付费。

适用用户

企业运维工程师

利用提示词快速生成监控、日志采集、备份恢复与性能优化脚本;按环境自动选择语言;输出注释与使用指南,提升交付速度并降低误操作。

DevOps工程师

在发布流水线中一键生成健康检查、滚动备份与资源巡检脚本;结构化日志与错误处理,便于持续集成与问题追踪,缩短故障修复时间。

中小企业系统管理员

不必深度编程,即可建立定时备份、磁盘空间告警、服务自检与日志打包脚本;标准化配置参数,快速覆盖Linux与Windows主机。

特征总结

根据服务器类型与任务场景,轻松生成可直接执行的运维脚本,减少手写成本与上线等待。
自动应用安全规范与最佳实践,内置风险防护与权限提示,降低误操作和合规隐患。
支持系统监控、日志分析、备份恢复与性能优化,一键切换场景模板快速落地。
自适应Linux与Windows环境,智能选择脚本语言与命令集,避免兼容性踩坑。
自动注释与使用说明齐备,开箱即跑,新人也能快速理解参数与执行步骤。
结构化错误处理与日志记录内置,问题可追踪可复盘,减少线上排查时间。
可按需定制参数与模块,批量生成标准化脚本,便于团队复用与版本管理。
生成前进行需求梳理与实现路径评估,提前提示风险与资源占用,稳妥上线。
提供多场景演示命令与范例,复制即用,缩短从方案到生产的准备时间。
全流程校验与性能优化建议,让脚本更稳更快,提升关键运维任务完成率。

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

AI 提示词价格
¥30.00元
先用后买,用好了再付款,超安全!

您购买后可以获得什么

获得完整提示词模板
- 共 740 tokens
- 5 个可调节参数
{ 服务器类型 } { 运维任务类型 } { 配置参数 } { 操作系统环境 } { 脚本语言偏好 }
获得社区贡献内容的使用权
- 精选社区优质案例,助您快速上手提示词
使用提示词兑换券,低至 ¥ 9.9
了解兑换券 →
限时半价

不要错过!

半价获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59