热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
本提示词专为IT运维人员设计,能够根据用户输入的服务器类型、运维任务类型和具体配置参数,自动生成专业、安全、高效的运维脚本。通过深度分析用户需求,结合最佳实践和安全性考量,提供可立即执行的脚本代码,显著提升运维效率,降低人为错误风险,确保系统稳定运行。支持多种主流操作系统和常见运维场景,包括系统监控、日志分析、备份恢复、性能优化等任务。
功能描述:
适用环境:
执行权限:
#!/usr/bin/env bash
# Node Exporter v1.6.x 安装与安全加固(多主机/本地均可)
# - 安装到 /opt/node_exporter/bin/node_exporter
# - 自定义参数文件:/etc/node_exporter/flags.env(保留用户自定义)
# - 脚本强制参数文件:/etc/node_exporter/ops-required.env(仅添加/覆盖必要过滤)
# - systemd 服务:/etc/systemd/system/node_exporter.service
# - 日志:/var/log/ops/monitor_install.log(logrotate 保留7天)
# - 防火墙仅放行 10.0.20.5:9100,优先 firewalld/ufw,不存在则使用 iptables(尽量最小化影响)
# - 自检失败自动回滚
set -euo pipefail
umask 027
# ===== 默认参数(可被命令行覆盖) =====
TARGETS_DEFAULT="10.0.12.15,10.0.12.16,10.0.12.17"
PROM_IP_DEFAULT="10.0.20.5"
PROM_PORT_DEFAULT="9100"
NODE_EXPORTER_VERSION_DEFAULT="1.6.1"
NET_IF_DEFAULT="eth0"
LOG_FILE_DEFAULT="/var/log/ops/monitor_install.log"
SSH_USER_DEFAULT="root"
SSH_PORT_DEFAULT="22"
SSH_OPTS_DEFAULT="-o BatchMode=yes -o StrictHostKeyChecking=accept-new -o ConnectTimeout=5 -o ServerAliveInterval=30"
DOWNLOAD_URL_DEFAULT="" # 留空时自动拼接 GitHub Releases
MODE_DEFAULT="remote" # remote | local
GENERATE_RULES_DEFAULT="false"
# ===== 全局变量(初始化为默认) =====
TARGETS="$TARGETS_DEFAULT"
PROM_IP="$PROM_IP_DEFAULT"
PROM_PORT="$PROM_PORT_DEFAULT"
NODE_EXPORTER_VERSION="$NODE_EXPORTER_VERSION_DEFAULT"
NET_IF="$NET_IF_DEFAULT"
LOG_FILE="$LOG_FILE_DEFAULT"
SSH_USER="$SSH_USER_DEFAULT"
SSH_PORT="$SSH_PORT_DEFAULT"
SSH_OPTS="$SSH_OPTS_DEFAULT"
DOWNLOAD_URL="$DOWNLOAD_URL_DEFAULT"
MODE="$MODE_DEFAULT"
GENERATE_RULES="$GENERATE_RULES_DEFAULT"
DRY_RUN="false"
print_usage() {
cat <<EOF
用法: $0 [选项]
-H, --hosts 目标主机列表(逗号分隔),默认: ${TARGETS_DEFAULT}
-m, --mode 执行模式: remote|local,默认: ${MODE_DEFAULT}
-u, --ssh-user SSH 用户,默认: ${SSH_USER_DEFAULT}
-p, --ssh-port SSH 端口,默认: ${SSH_PORT_DEFAULT}
-o, --ssh-opts 追加 SSH 选项(附加到默认值)
-v, --version node_exporter 版本,默认: ${NODE_EXPORTER_VERSION_DEFAULT}
--prom-ip 允许访问 9100 的 Prometheus IP,默认: ${PROM_IP_DEFAULT}
--prom-port node_exporter 端口,默认: ${PROM_PORT_DEFAULT}
--net-if 网卡白名单(netdev include),默认: ${NET_IF_DEFAULT}
--log-file 安装日志文件,默认: ${LOG_FILE_DEFAULT}
--download-url 指定 node_exporter 压缩包 URL(可指向内网镜像);留空使用 GitHub
--dry-run 试运行(不更改远端,仅打印将执行的操作)
--generate-rules 在本地输出 Prometheus 告警规则 YAML 到标准输出后退出
-h, --help 显示帮助
示例(远程多主机):
$0 -H "10.0.12.15,10.0.12.16,10.0.12.17" --prom-ip 10.0.20.5
示例(本地主机):
sudo $0 -m local --prom-ip 10.0.20.5
示例(生成告警规则):
$0 --generate-rules > node.rules.yml
EOF
}
log() { echo "[$(date '+%F %T')] [INFO] $*"; }
warn() { echo "[$(date '+%F %T')] [WARN] $*" >&2; }
error() { echo "[$(date '+%F %T')] [ERROR] $*" >&2; }
parse_args() {
while [[ $# -gt 0 ]]; do
case "$1" in
-H|--hosts) TARGETS="$2"; shift 2;;
-m|--mode) MODE="$2"; shift 2;;
-u|--ssh-user) SSH_USER="$2"; shift 2;;
-p|--ssh-port) SSH_PORT="$2"; shift 2;;
-o|--ssh-opts) SSH_OPTS="$SSH_OPTS $2"; shift 2;;
-v|--version) NODE_EXPORTER_VERSION="$2"; shift 2;;
--prom-ip) PROM_IP="$2"; shift 2;;
--prom-port) PROM_PORT="$2"; shift 2;;
--net-if) NET_IF="$2"; shift 2;;
--log-file) LOG_FILE="$2"; shift 2;;
--download-url) DOWNLOAD_URL="$2"; shift 2;;
--dry-run) DRY_RUN="true"; shift 1;;
--generate-rules) GENERATE_RULES="true"; shift 1;;
-h|--help) print_usage; exit 0;;
*) error "未知参数: $1"; print_usage; exit 2;;
esac
done
}
generate_rules_yaml() {
# 生成与需求一致的告警规则(供 10.0.20.5 的 Prometheus 引用)
# 过滤 tmpfs/docker/snap, 网卡仅计入 eth0 的逻辑在 exporter 端已控制;此处按常规过滤增强鲁棒性
cat <<'YAML'
groups:
- name: node_basic_alerts
rules:
- alert: HighCpuUsage
expr: |
avg by (instance) (rate(node_cpu_seconds_total{mode!="idle"}[5m])) > 0.85
for: 5m
labels:
severity: warning
annotations:
summary: "CPU 使用率高 (instance={{ $labels.instance }})"
description: "CPU 使用率持续5m超过85%"
- alert: HighMemoryUsage
expr: |
(1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)) > 0.90
for: 5m
labels:
severity: warning
annotations:
summary: "内存使用率高 (instance={{ $labels.instance }})"
description: "内存使用率超过90%"
- alert: DiskUsageHighVar
expr: |
(node_filesystem_size_bytes{mountpoint="/var",fstype!~"tmpfs|squashfs|nsfs|overlay|aufs|bcachefs",device!~"docker.*|containerd.*|loop.*|snap.*"}
- node_filesystem_free_bytes{mountpoint="/var",fstype!~"tmpfs|squashfs|nsfs|overlay|aufs|bcachefs",device!~"docker.*|containerd.*|loop.*|snap.*"})
/ node_filesystem_size_bytes{mountpoint="/var",fstype!~"tmpfs|squashfs|nsfs|overlay|aufs|bcachefs",device!~"docker.*|containerd.*|loop.*|snap.*"} > 0.80
for: 10m
labels:
severity: warning
annotations:
summary: "/var 磁盘使用率高 (instance={{ $labels.instance }})"
description: "/var 使用率超过80%"
- alert: DiskUsageHighHome
expr: |
(node_filesystem_size_bytes{mountpoint="/home",fstype!~"tmpfs|squashfs|nsfs|overlay|aufs|bcachefs",device!~"docker.*|containerd.*|loop.*|snap.*"}
- node_filesystem_free_bytes{mountpoint="/home",fstype!~"tmpfs|squashfs|nsfs|overlay|aufs|bcachefs",device!~"docker.*|containerd.*|loop.*|snap.*"})
/ node_filesystem_size_bytes{mountpoint="/home",fstype!~"tmpfs|squashfs|nsfs|overlay|aufs|bcachefs",device!~"docker.*|containerd.*|loop.*|snap.*"} > 0.80
for: 10m
labels:
severity: warning
annotations:
summary: "/home 磁盘使用率高 (instance={{ $labels.instance }})"
description: "/home 使用率超过80%"
- alert: InodeLow
expr: |
(node_filesystem_files_free{fstype!~"tmpfs|squashfs|nsfs|overlay|aufs|bcachefs",device!~"docker.*|containerd.*|loop.*|snap.*"}
/ node_filesystem_files{fstype!~"tmpfs|squashfs|nsfs|overlay|aufs|bcachefs",device!~"docker.*|containerd.*|loop.*|snap.*"}) < 0.10
for: 10m
labels:
severity: warning
annotations:
summary: "inode 剩余低 (instance={{ $labels.instance }})"
description: "inode 剩余少于10%"
YAML
}
# ===== 远端执行脚本(通过 SSH 下发) =====
remote_payload() {
# 变量从环境传入:PROM_IP PROM_PORT NODE_EXPORTER_VERSION NET_IF LOG_FILE DOWNLOAD_URL DRY_RUN
cat <<'REMOTE_EOF'
set -euo pipefail
umask 027
PROM_IP="${PROM_IP:-10.0.20.5}"
PROM_PORT="${PROM_PORT:-9100}"
NODE_EXPORTER_VERSION="${NODE_EXPORTER_VERSION:-1.6.1}"
NET_IF="${NET_IF:-eth0}"
LOG_FILE="${LOG_FILE:-/var/log/ops/monitor_install.log}"
DOWNLOAD_URL="${DOWNLOAD_URL:-}"
DRY_RUN="${DRY_RUN:-false}"
NE_USER="monitor"
NE_GROUP="monitor"
NE_HOME="/var/lib/node_exporter"
NE_BIN_DIR="/opt/node_exporter/bin"
NE_BIN="${NE_BIN_DIR}/node_exporter"
NE_ETC="/etc/node_exporter"
NE_ENV_CUSTOM="${NE_ETC}/flags.env"
NE_ENV_REQUIRED="${NE_ETC}/ops-required.env"
NE_SERVICE="/etc/systemd/system/node_exporter.service"
NE_STATE="${NE_HOME}/install_state"
BACKUP_DIR_BASE="/opt/node_exporter/backup"
TS="$(date +%Y%m%d%H%M%S)"
BACKUP_DIR="${BACKUP_DIR_BASE}/${TS}"
mkdir -p "$(dirname "${LOG_FILE}")"
touch "${LOG_FILE}"
chmod 0640 "${LOG_FILE}"
exec &> >(tee -a "${LOG_FILE}")
log() { echo "[$(date '+%F %T')] [INFO] $*"; }
warn() { echo "[$(date '+%F %T')] [WARN] $*" >&2; }
error() { echo "[$(date '+%F %T')] [ERROR] $*" >&2; }
require_root() {
if [[ "$(id -u)" -ne 0 ]]; then
error "需要 root 权限执行"
exit 1
fi
}
setup_logrotate() {
local lr="/etc/logrotate.d/ops-monitor-install"
if [[ ! -f "$lr" ]]; then
cat > "$lr" <<EOF
${LOG_FILE} {
daily
rotate 7
missingok
compress
notifempty
copytruncate
}
EOF
log "创建 logrotate 规则: $lr"
fi
}
detect_arch() {
local m="$(uname -m)"
case "$m" in
x86_64) echo "amd64" ;;
aarch64) echo "arm64" ;;
armv7l) echo "armv7" ;;
ppc64le) echo "ppc64le" ;;
s390x) echo "s390x" ;;
*) error "不支持的架构: $m"; exit 1;;
esac
}
get_current_version() {
if [[ -x "${NE_BIN}" ]]; then
"${NE_BIN}" --version 2>/dev/null | awk 'NR==1{print $3}' || true
else
echo ""
fi
}
download_node_exporter() {
local arch="$1"
local ver="$2"
local url
local tmpdir="/tmp/node_exporter_${ver}_${arch}_${TS}"
mkdir -p "$tmpdir"
if [[ -n "${DOWNLOAD_URL}" ]]; then
url="${DOWNLOAD_URL}"
else
url="https://github.com/prometheus/node_exporter/releases/download/v${ver}/node_exporter-${ver}.linux-${arch}.tar.gz"
fi
log "下载 node_exporter ${ver} (${arch}) from ${url}"
if [[ "${DRY_RUN}" == "true" ]]; then
echo "[DRY-RUN] curl -fSL ${url} -o ${tmpdir}/node_exporter.tar.gz"
return 0
fi
if command -v curl >/dev/null 2>&1; then
curl -fSL "${url}" -o "${tmpdir}/node_exporter.tar.gz"
elif command -v wget >/dev/null 2>&1; then
wget -O "${tmpdir}/node_exporter.tar.gz" "${url}"
else
error "需要 curl 或 wget"
exit 1
fi
tar -C "${tmpdir}" -xzf "${tmpdir}/node_exporter.tar.gz"
local src="${tmpdir}/node_exporter-${ver}.linux-${arch}/node_exporter"
if [[ ! -f "${src}" ]]; then
error "解压后未找到 node_exporter 二进制"
exit 1
fi
mkdir -p "${NE_BIN_DIR}"
install -m 0755 "${src}" "${NE_BIN}"
}
create_user_group() {
local created="false"
if ! getent group "${NE_GROUP}" >/dev/null; then
if [[ "${DRY_RUN}" == "true" ]]; then
echo "[DRY-RUN] groupadd --system ${NE_GROUP}"
else
groupadd --system "${NE_GROUP}"
fi
created="true"
fi
local nologin="/usr/sbin/nologin"
[[ -x "$nologin" ]] || nologin="/sbin/nologin"
if ! id -u "${NE_USER}" >/dev/null 2>&1; then
if [[ "${DRY_RUN}" == "true" ]]; then
echo "[DRY-RUN] useradd --system --home ${NE_HOME} --shell ${nologin} -g ${NE_GROUP} ${NE_USER}"
else
useradd --system --home "${NE_HOME}" --shell "${nologin}" -g "${NE_GROUP}" "${NE_USER}"
fi
created="true"
fi
mkdir -p "${NE_HOME}" "${NE_ETC}"
chown -R "${NE_USER}:${NE_GROUP}" "${NE_HOME}"
if [[ "${created}" == "true" ]]; then
echo "CREATED_USER=1" >> "${NE_STATE}.tmp"
fi
}
backup_existing() {
mkdir -p "${BACKUP_DIR}"
local backed="false"
if [[ -f "${NE_BIN}" ]]; then
cp -a "${NE_BIN}" "${BACKUP_DIR}/node_exporter.bak"
backed="true"
fi
if [[ -f "${NE_SERVICE}" ]]; then
cp -a "${NE_SERVICE}" "${BACKUP_DIR}/node_exporter.service.bak"
backed="true"
fi
if [[ -f "${NE_ENV_CUSTOM}" ]]; then
cp -a "${NE_ENV_CUSTOM}" "${BACKUP_DIR}/flags.env.bak"
backed="true"
fi
if [[ -f "${NE_ENV_REQUIRED}" ]]; then
cp -a "${NE_ENV_REQUIRED}" "${BACKUP_DIR}/ops-required.env.bak"
backed="true"
fi
if [[ "${backed}" == "true" ]]; then
echo "BACKUP_DIR=${BACKUP_DIR}" >> "${NE_STATE}.tmp"
log "已备份现有文件到 ${BACKUP_DIR}"
fi
}
write_required_env() {
# 仅放置本脚本要求的过滤选项,保留用户自定义 flags.env 不覆盖
# 过滤: 忽略 tmpfs、docker*、snap;网卡白名单 eth0;端口 PROM_PORT
# 使用 NODE_EXPORTER_REQUIRED_OPTS 变量,systemd 中按 custom + required 顺序传入,required 后置覆盖
local fs_types='^(tmpfs)$'
local mp_excl='^(/var/lib/docker/.+|/docker.+|/snap($|/).*)'
cat > "${NE_ENV_REQUIRED}" <<EOF
# 自动生成(请勿手动修改),自定义请写入 ${NE_ENV_CUSTOM}
NODE_EXPORTER_REQUIRED_OPTS="--web.listen-address=:${PROM_PORT} \\
--collector.filesystem.fs-types-exclude=${fs_types} \\
--collector.filesystem.mount-points-exclude=${mp_excl} \\
--collector.netdev.device-include=^(${NET_IF})$"
EOF
chown "${NE_USER}:${NE_GROUP}" "${NE_ENV_REQUIRED}"
chmod 0640 "${NE_ENV_REQUIRED}"
}
ensure_custom_env_exists() {
# 若用户未创建自定义文件,生成一个模板但不包含任何强制参数
if [[ ! -f "${NE_ENV_CUSTOM}" ]]; then
cat > "${NE_ENV_CUSTOM}" <<'EOF'
# 自定义 node_exporter 启动参数(可选)
# 示例:
# NODE_EXPORTER_OPTS="--collector.textfile.directory=/var/lib/node_exporter/textfile_collector"
NODE_EXPORTER_OPTS=""
EOF
chown "${NE_USER}:${NE_GROUP}" "${NE_ENV_CUSTOM}"
chmod 0640 "${NE_ENV_CUSTOM}"
fi
}
write_systemd_unit() {
local need_reload="false"
local unit_content="[Unit]
Description=Prometheus Node Exporter
Wants=network-online.target
After=network-online.target
[Service]
User=${NE_USER}
Group=${NE_GROUP}
Type=simple
EnvironmentFile=-${NE_ENV_CUSTOM}
EnvironmentFile=-${NE_ENV_REQUIRED}
ExecStart=${NE_BIN} \$NODE_EXPORTER_OPTS \$NODE_EXPORTER_REQUIRED_OPTS
Restart=on-failure
RestartSec=5s
NoNewPrivileges=true
ProtectSystem=full
ProtectHome=true
PrivateTmp=true
PrivateDevices=true
ProtectKernelTunables=true
ProtectKernelModules=true
LockPersonality=true
RestrictNamespaces=true
RestrictRealtime=true
SystemCallArchitectures=native
UMask=027
[Install]
WantedBy=multi-user.target
"
if [[ ! -f "${NE_SERVICE}" ]]; then
echo "${unit_content}" > "${NE_SERVICE}"
need_reload="true"
echo "CREATED_SERVICE=1" >> "${NE_STATE}.tmp"
else
# 尽量不覆盖已有服务。但若发现不是monitor用户或ExecStart不包含所需环境文件,则进行温和修正(先备份)。
if ! grep -q "^User=${NE_USER}$" "${NE_SERVICE}" || ! grep -q "${NE_ENV_REQUIRED}" "${NE_SERVICE}"; then
cp -a "${NE_SERVICE}" "${NE_SERVICE}.${TS}.bak"
echo "${unit_content}" > "${NE_SERVICE}"
need_reload="true"
echo "UPDATED_SERVICE=1" >> "${NE_STATE}.tmp"
log "已更新 systemd unit 并备份为 ${NE_SERVICE}.${TS}.bak"
fi
fi
chown root:root "${NE_SERVICE}"
chmod 0644 "${NE_SERVICE}"
if [[ "${need_reload}" == "true" ]]; then
systemctl daemon-reload
fi
}
ensure_permissions() {
chown -R "${NE_USER}:${NE_GROUP}" "${NE_BIN_DIR}" "${NE_ETC}"
}
start_enable_service() {
systemctl enable node_exporter >/dev/null 2>&1 || true
systemctl restart node_exporter
systemctl is-active --quiet node_exporter
}
# --- 防火墙处理 ---
fw_allow_firewalld() {
local changed="false"
if firewall-cmd --state >/dev/null 2>&1; then
# 移除可能存在的对 9100 的全局放行(谨慎)
if firewall-cmd --permanent --list-ports | grep -qw "${PROM_PORT}/tcp"; then
firewall-cmd --permanent --remove-port="${PROM_PORT}/tcp" || true
changed="true"
echo "FW_CHG=removed_global_port" >> "${NE_STATE}.tmp"
fi
# 添加仅对 PROM_IP 放行
local rule="rule family=ipv4 source address=${PROM_IP} port protocol=tcp port=${PROM_PORT} accept"
if ! firewall-cmd --permanent --query-rich-rule="$rule" >/dev/null; then
firewall-cmd --permanent --add-rich-rule="$rule"
changed="true"
echo "FW_CHG=add_allow_${PROM_IP}_${PROM_PORT}" >> "${NE_STATE}.tmp"
end_if=true
fi
if [[ "${changed}" == "true" ]]; then
firewall-cmd --reload || true
fi
return 0
fi
return 1
}
fw_allow_ufw() {
if command -v ufw >/dev/null 2>&1 && ufw status | grep -q "Status: active"; then
# 删除任何全局 9100 放行
if ufw status | grep -Eqw "${PROM_PORT}/tcp\s+ALLOW\s+Anywhere"; then
yes | ufw delete allow "${PROM_PORT}/tcp" || true
echo "FW_CHG=ufw_del_global" >> "${NE_STATE}.tmp"
fi
# 添加仅对 PROM_IP 放行
if ! ufw status | grep -Eqw "${PROM_PORT}/tcp\s+ALLOW\s+${PROM_IP}"; then
ufw allow proto tcp from "${PROM_IP}" to any port "${PROM_PORT}"
echo "FW_CHG=ufw_add_${PROM_IP}_${PROM_PORT}" >> "${NE_STATE}.tmp"
fi
ufw reload || true
return 0
fi
return 1
}
fw_allow_iptables() {
# 仅在 firewalld/ufw 不可用时使用;在 INPUT 链前插入规则:允许 PROM_IP:PORT,随后丢弃其它对该端口的访问
# 尽量幂等:先查询再插入
local ipt="iptables"
local ipt6="ip6tables"
if ! command -v ${ipt} >/dev/null 2>&1; then
warn "未检测到 firewalld/ufw/iptables,跳过端口限制(请手工加固)"
return 1
fi
# IPv4
if ! ${ipt} -C INPUT -p tcp --dport "${PROM_PORT}" -s "${PROM_IP}" -j ACCEPT 2>/dev/null; then
${ipt} -I INPUT 1 -p tcp --dport "${PROM_PORT}" -s "${PROM_IP}" -j ACCEPT
echo "FW_CHG=ipt_v4_accept_${PROM_IP}_${PROM_PORT}" >> "${NE_STATE}.tmp"
fi
if ! ${ipt} -C INPUT -p tcp --dport "${PROM_PORT}" -j DROP 2>/dev/null; then
${ipt} -I INPUT 2 -p tcp --dport "${PROM_PORT}" -j DROP
echo "FW_CHG=ipt_v4_drop_${PROM_PORT}" >> "${NE_STATE}.tmp"
fi
# IPv6(如不需要可忽略,保持最小改动:仅丢弃该端口)
if command -v ${ipt6} >/dev/null 2>&1; then
if ! ${ipt6} -C INPUT -p tcp --dport "${PROM_PORT}" -j DROP 2>/dev/null; then
${ipt6} -I INPUT 1 -p tcp --dport "${PROM_PORT}" -j DROP
echo "FW_CHG=ipt_v6_drop_${PROM_PORT}" >> "${NE_STATE}.tmp"
fi
fi
# 持久化(若可用)
if command -v netfilter-persistent >/dev/null 2>&1; then
netfilter-persistent save || true
elif command -v service >/dev/null 2>&1 && service netfilter-persistent status >/dev/null 2>&1; then
service netfilter-persistent save || true
elif command -v iptables-save >/dev/null 2>&1 && [[ -d /etc/iptables ]]; then
iptables-save > /etc/iptables/rules.v4 || true
command -v ip6tables-save >/dev/null 2>&1 && ip6tables-save > /etc/iptables/rules.v6 || true
else
warn "iptables 规则可能不会持久化,请根据发行版安装持久化工具(如 iptables-persistent)"
fi
return 0
}
apply_firewall() {
log "应用防火墙策略:仅允许 ${PROM_IP} 访问 ${PROM_PORT}/tcp"
if [[ "${DRY_RUN}" == "true" ]]; then
echo "[DRY-RUN] 尝试 firewalld/ufw/iptables 依次应用"
return 0
fi
fw_allow_firewalld && return 0
fw_allow_ufw && return 0
fw_allow_iptables && return 0
warn "未成功应用任何防火墙加固,请手工限制对 ${PROM_PORT}/tcp 的访问来源为 ${PROM_IP}"
}
self_check() {
log "执行自检: curl http://127.0.0.1:${PROM_PORT}/metrics"
for i in {1..15}; do
if curl -fsS "http://127.0.0.1:${PROM_PORT}/metrics" | grep -q "^node_exporter_build_info"; then
log "自检成功"
return 0
fi
sleep 1
done
return 1
}
rollback() {
warn "开始回滚..."
systemctl stop node_exporter || true
if [[ -f "${NE_STATE}" ]]; then
# shellcheck disable=SC1090
source "${NE_STATE}" || true
fi
if [[ -n "${BACKUP_DIR:-}" && -d "${BACKUP_DIR}" ]]; then
[[ -f "${BACKUP_DIR}/node_exporter.bak" ]] && install -m 0755 "${BACKUP_DIR}/node_exporter.bak" "${NE_BIN}" || rm -f "${NE_BIN}"
[[ -f "${BACKUP_DIR}/node_exporter.service.bak" ]] && install -m 0644 "${BACKUP_DIR}/node_exporter.service.bak" "${NE_SERVICE}" || rm -f "${NE_SERVICE}"
[[ -f "${BACKUP_DIR}/flags.env.bak" ]] && install -m 0640 "${BACKUP_DIR}/flags.env.bak" "${NE_ENV_CUSTOM}" || true
[[ -f "${BACKUP_DIR}/ops-required.env.bak" ]] && install -m 0640 "${BACKUP_DIR}/ops-required.env.bak" "${NE_ENV_REQUIRED}" || rm -f "${NE_ENV_REQUIRED}"
systemctl daemon-reload || true
else
# 无备份:尽力清理我们创建的文件
rm -f "${NE_ENV_REQUIRED}" || true
fi
# 撤销防火墙变更(仅尝试删除我们添加的规则)
if command -v firewall-cmd >/dev/null 2>&1 && firewall-cmd --state >/dev/null 2>&1; then
local rule="rule family=ipv4 source address=${PROM_IP} port protocol=tcp port=${PROM_PORT} accept"
firewall-cmd --permanent --remove-rich-rule="$rule" || true
firewall-cmd --reload || true
fi
if command -v ufw >/dev/null 2>&1 && ufw status | grep -q "Status: active"; then
yes | ufw delete allow proto tcp from "${PROM_IP}" to any port "${PROM_PORT}" || true
ufw reload || true
fi
if command -v iptables >/dev/null 2>&1; then
iptables -D INPUT -p tcp --dport "${PROM_PORT}" -s "${PROM_IP}" -j ACCEPT 2>/dev/null || true
iptables -D INPUT -p tcp --dport "${PROM_PORT}" -j DROP 2>/dev/null || true
command -v netfilter-persistent >/dev/null 2>&1 && netfilter-persistent save || true
fi
# 删除用户(仅当我们创建过且系统中不再需要时)
if [[ -f "${NE_STATE}" ]] && grep -q "^CREATED_USER=1" "${NE_STATE}"; then
if id -u "${NE_USER}" >/dev/null 2>&1; then
userdel "${NE_USER}" || true
fi
getent group "${NE_GROUP}" >/dev/null 2>&1 && groupdel "${NE_GROUP}" || true
fi
warn "回滚完成"
}
finalize_state() {
if [[ -f "${NE_STATE}.tmp" ]]; then
mv "${NE_STATE}.tmp" "${NE_STATE}"
chown "${NE_USER}:${NE_GROUP}" "${NE_STATE}"
chmod 0640 "${NE_STATE}"
fi
}
main_remote() {
require_root
setup_logrotate
: > "${NE_STATE}.tmp" # 初始化临时状态文件
# 前置检测
for b in systemctl tar; do
command -v "$b" >/dev/null 2>&1 || { error "缺少必需命令: $b"; exit 1; }
end
done
# 版本判断与幂等
local arch
arch="$(detect_arch)"
local cur_ver
cur_ver="$(get_current_version || true)"
log "当前版本: ${cur_ver:-<未安装>} 目标版本: ${NODE_EXPORTER_VERSION}"
backup_existing
create_user_group
if [[ -z "${cur_ver}" || ! "${cur_ver}" =~ ^1\.6\.[0-9]+$ ]]; then
if [[ "${DRY_RUN}" == "true" ]]; then
echo "[DRY-RUN] 将安装 node_exporter v${NODE_EXPORTER_VERSION} 到 ${NE_BIN}"
else
download_node_exporter "${arch}" "${NODE_EXPORTER_VERSION}"
echo "INSTALLED_NEW_BIN=1" >> "${NE_STATE}.tmp"
fi
else
log "已安装满足要求的 1.6.x,跳过二进制安装"
fi
ensure_custom_env_exists
write_required_env
write_systemd_unit
ensure_permissions
if [[ "${DRY_RUN}" == "true" ]]; then
echo "[DRY-RUN] 将启动并启用 systemd 服务 node_exporter"
else
start_enable_service
fi
if [[ "${DRY_RUN}" == "true" ]]; then
echo "[DRY-RUN] 将应用防火墙限制: 仅 ${PROM_IP} -> ${PROM_PORT}/tcp"
else
apply_firewall
fi
if [[ "${DRY_RUN}" == "true" ]]; then
echo "[DRY-RUN] 自检跳过"
finalize_state
log "DRY-RUN 完成(未对系统做实际改动)"
exit 0
fi
if ! self_check; then
error "自检失败,执行回滚"
rollback
exit 1
fi
finalize_state
log "安装/配置完成"
}
main_remote
REMOTE_EOF
}
run_remote_host() {
local host="$1"
log "====== 处理主机 ${host} ======"
if [[ "${MODE}" == "local" ]]; then
# 本地执行(忽略 SSH)
PROM_IP="${PROM_IP}" PROM_PORT="${PROM_PORT}" NODE_EXPORTER_VERSION="${NODE_EXPORTER_VERSION}" \
NET_IF="${NET_IF}" LOG_FILE="${LOG_FILE}" DOWNLOAD_URL="${DOWNLOAD_URL}" DRY_RUN="${DRY_RUN}" \
bash -s <<'EOF'
$(remote_payload)
EOF
return $?
fi
# 远程执行,通过 sudo 切到 root
if [[ "${DRY_RUN}" == "true" ]]; then
log "[DRY-RUN] 将通过 SSH 连接 ${SSH_USER}@${host}:${SSH_PORT}"
fi
# 将变量以环境导出给远端
local envs="PROM_IP='${PROM_IP}' PROM_PORT='${PROM_PORT}' NODE_EXPORTER_VERSION='${NODE_EXPORTER_VERSION}' NET_IF='${NET_IF}' LOG_FILE='${LOG_FILE}' DOWNLOAD_URL='${DOWNLOAD_URL}' DRY_RUN='${DRY_RUN}'"
ssh -p "${SSH_PORT}" ${SSH_OPTS} "${SSH_USER}@${host}" "sudo env ${envs} bash -s" <<'EOF'
$(remote_payload)
EOF
}
main() {
parse_args "$@"
if [[ "${GENERATE_RULES}" == "true" ]]; then
generate_rules_yaml
exit 0
fi
if [[ "${MODE}" != "remote" && "${MODE}" != "local" ]]; then
error "--mode 必须为 remote 或 local"
exit 2
fi
IFS=',' read -r -a hosts <<< "${TARGETS}"
if [[ "${MODE}" == "local" ]]; then
# 本地模式只处理本机,忽略 hosts
run_remote_host "127.0.0.1"
exit $?
fi
# 远程模式,多主机串行处理(可根据需要改为并行)
overall_rc=0
for h in "${hosts[@]}"; do
h_trim="$(echo "$h" | xargs)"
[[ -z "$h_trim" ]] && continue
if ! run_remote_host "$h_trim"; then
overall_rc=1
warn "主机 ${h_trim} 处理失败"
else
log "主机 ${h_trim} 完成"
fi
done
exit $overall_rc
}
main "$@"
批量在 prod-web 集群三台主机上安装并加固,允许 10.0.20.5 拉取
在单台主机本地执行(例如登录到 10.0.12.15 后执行)
生成 Prometheus 告警规则文件,在 Prometheus 服务器 10.0.20.5 上使用
执行前的准备
防火墙与最小权限
配置与幂等
自检与回滚
性能与资源
Prometheus 侧
安全与合规
<#
.SYNOPSIS
IIS W3C日志近24小时过滤与Top20统计,导出CSV与报告,并打包ZIP与保留7天。
.DESCRIPTION
- 自动处理原始与压缩日志(.log/.zip/.gz),只读访问原始文件。
- 过滤条件:sc-status >= StatusThreshold(默认500) OR time-taken >= TimeTakenMsThreshold(默认3000)。
- 时间范围:近 TimeWindowHours 小时(默认24小时),默认按UTC解析W3C时间(可切换为本地)。
- 导出:明细CSV、Top20摘要CSV、报告txt;目录ZIP打包;7天清理历史。
- 兼容UTF-8(含BOM)/ANSI(常见ASCII子集),可指定默认编码名。
.NOTES
作者:运维脚本开发专家
#>
[CmdletBinding()]
param(
# 日志目录(可多路径)
[Parameter(Mandatory=$false)]
[string[]]$LogDirectories = @(
'C:\inetpub\logs\LogFiles\W3SVC1',
'C:\inetpub\logs\LogFiles\W3SVC2'
),
# 输出根目录(按日期建立子目录)
[Parameter(Mandatory=$false)]
[string]$OutputRoot = 'D:\ops\log_analysis',
# 时间窗口(小时)
[Parameter(Mandatory=$false)]
[int]$TimeWindowHours = 24,
# 过滤阈值
[Parameter(Mandatory=$false)]
[int]$StatusThreshold = 500,
[Parameter(Mandatory=$false)]
[int]$TimeTakenMsThreshold = 3000,
# Top N
[Parameter(Mandatory=$false)]
[int]$TopN = 20,
# 假定日志时间为UTC(W3C默认为UTC)。若IIS配置为本地时间,请设为$false
[Parameter(Mandatory=$false)]
[bool]$AssumeUtc = $true,
# 解压及临时目录
[Parameter(Mandatory=$false)]
[string]$TempExtractRoot = "$env:TEMP\iis_log_extract",
# 文件查找回溯天数(减少扫描无关文件,实际仍按时间字段再过滤)
[Parameter(Mandatory=$false)]
[int]$LookbackDaysForFiles = 3,
# 默认编码名(当无BOM时使用);常见可选:'utf-8','windows-1252','gb18030'
[Parameter(Mandatory=$false)]
[string]$DefaultEncodingName = 'utf-8',
# 结果保留天数
[Parameter(Mandatory=$false)]
[int]$RetentionDays = 7,
# 是否执行保留策略清理(仅限OutputRoot内产物)
[Parameter(Mandatory=$false)]
[bool]$EnforceRetention = $true
)
begin {
Set-StrictMode -Version Latest
$ErrorActionPreference = 'Stop'
# 准备输出目录
$dateTag = (Get-Date -Format 'yyyyMMdd_HHmm')
$todayTag = (Get-Date -Format 'yyyyMMdd')
$OutputDir = Join-Path $OutputRoot $dateTag
if (-not (Test-Path $OutputDir)) { New-Item -ItemType Directory -Path $OutputDir | Out-Null }
# 加载压缩库
Add-Type -AssemblyName System.IO.Compression.FileSystem
# 日志记录函数
$LogFile = Join-Path $OutputDir "run.log"
function Write-Log {
param([string]$Message, [string]$Level = 'INFO')
$ts = Get-Date -Format 'yyyy-MM-dd HH:mm:ss'
$line = "[$ts][$Level] $Message"
$line | Tee-Object -FilePath $LogFile -Append
}
Write-Log "脚本启动:IIS日志筛选与统计"
Write-Log "参数:Dirs=$($LogDirectories -join ';'), OutputRoot=$OutputRoot, Window=${TimeWindowHours}h, AssumeUtc=$AssumeUtc, Thresholds: sc-status>=$StatusThreshold OR time-taken>=$TimeTakenMsThreshold ms"
# 计算时间窗口
if ($AssumeUtc) {
$windowEndUtc = (Get-Date).ToUniversalTime()
$windowStartUtc = $windowEndUtc.AddHours(-$TimeWindowHours)
} else {
$windowEndLocal = (Get-Date)
$windowStartLocal = $windowEndLocal.AddHours(-$TimeWindowHours)
}
# 服务器名
$ServerName = $env:COMPUTERNAME
# 临时解压目录
$SessionTempDir = Join-Path $TempExtractRoot $dateTag
if (-not (Test-Path $SessionTempDir)) { New-Item -ItemType Directory -Path $SessionTempDir | Out-Null }
# 读文件的StreamReader(自动BOM检测,失败时用默认编码)
function New-StreamReader {
param([string]$Path)
$fs = [System.IO.File]::Open($Path, [System.IO.FileMode]::Open, [System.IO.FileAccess]::Read, [System.IO.FileShare]::ReadWrite)
try {
# 首选UTF-8 + BOM检测
$enc = [System.Text.Encoding]::GetEncoding($DefaultEncodingName)
} catch {
$enc = [System.Text.Encoding]::UTF8
}
return New-Object System.IO.StreamReader($fs, $enc, $true, 4096, $false)
}
# 解压.zip/.gz返回.log文件路径列表
function Expand-LogArchive {
param([string]$ArchivePath, [string]$DestRoot)
$ext = [System.IO.Path]::GetExtension($ArchivePath).ToLowerInvariant()
$result = @()
if ($ext -eq '.zip') {
$dest = Join-Path $DestRoot ([IO.Path]::GetFileNameWithoutExtension($ArchivePath))
if (-not (Test-Path $dest)) { New-Item -ItemType Directory -Path $dest | Out-Null }
[System.IO.Compression.ZipFile]::ExtractToDirectory($ArchivePath, $dest, $true)
$result += Get-ChildItem -LiteralPath $dest -Recurse -File -Include *.log | Select-Object -ExpandProperty FullName
} elseif ($ext -eq '.gz' -or $ext -eq '.gzip') {
$baseName = [IO.Path]::GetFileNameWithoutExtension($ArchivePath)
$dest = Join-Path $DestRoot $baseName
if (-not (Test-Path $dest)) { New-Item -ItemType Directory -Path $dest | Out-Null }
$outFile = Join-Path $dest ($baseName -replace '\.log$','' + '.log')
try {
$inStream = [System.IO.File]::OpenRead($ArchivePath)
try {
$gzip = New-Object System.IO.Compression.GzipStream($inStream, [IO.Compression.CompressionMode]::Decompress)
$outStream = [System.IO.File]::Create($outFile)
try {
$gzip.CopyTo($outStream)
} finally {
$outStream.Close()
$outStream.Dispose()
$gzip.Close()
$gzip.Dispose()
}
} finally {
$inStream.Close()
$inStream.Dispose()
}
if (Test-Path $outFile) { $result += $outFile }
} catch {
Write-Log "解压GZ失败:$ArchivePath => $($_.Exception.Message)" "WARN"
}
}
return $result
}
# 解析W3C日志文件,返回满足时间窗口与条件的明细项;同时更新聚合
$Aggregations = @{} # key: "uri|status|ip" => [hashtable] Count, SumTime, MaxTime
$Details = New-Object System.Collections.Generic.List[object]
$TotalLines = 0
$ScannedFiles = 0
$MatchedLines = 0
$FilesProcessed = @()
function Parse-W3C-File {
param(
[string]$FilePath,
[bool]$AssumeUtc
)
$ScannedFiles++
$fieldsMap = @{}
$hasFields = $false
$lineNo = 0
try {
$reader = New-StreamReader -Path $FilePath
} catch {
Write-Log "打开文件失败(跳过):$FilePath => $($_.Exception.Message)" "WARN"
return
}
try {
while (-not $reader.EndOfStream) {
$line = $reader.ReadLine()
$lineNo++
if ([string]::IsNullOrWhiteSpace($line)) { continue }
if ($line.StartsWith('#')) {
if ($line.StartsWith('#Fields:', [System.StringComparison]::OrdinalIgnoreCase)) {
$hasFields = $true
$fieldsMap.Clear()
# 例:#Fields: date time s-sitename s-computername s-ip cs-method ...
$fieldsLine = $line.Substring(8).Trim()
$fields = $fieldsLine -split '\s+'
for ($i=0; $i -lt $fields.Length; $i++) {
$fieldsMap[$fields[$i]] = $i
}
}
continue
}
if (-not $hasFields) {
# 未出现#Fields:无法解析,跳过
continue
}
$parts = $line -split '\s+'
# 安全检查
if ($parts.Count -lt $fieldsMap.Count) { continue }
# 读取关键字段(不存在则为缺省)
$get = {
param($name)
if ($fieldsMap.ContainsKey($name)) {
$idx = $fieldsMap[$name]
if ($idx -lt $parts.Count) { return $parts[$idx] }
}
return $null
}
$dateStr = & $get 'date'
$timeStr = & $get 'time'
if ([string]::IsNullOrEmpty($dateStr) -or [string]::IsNullOrEmpty($timeStr)) { continue }
# 解析时间
try {
$style = if ($AssumeUtc) { [System.Globalization.DateTimeStyles]::AssumeUniversal } else { [System.Globalization.DateTimeStyles]::None }
$dt = [datetime]::ParseExact("$dateStr $timeStr", 'yyyy-MM-dd HH:mm:ss', [System.Globalization.CultureInfo]::InvariantCulture, $style)
if ($AssumeUtc -and $dt.Kind -ne [DateTimeKind]::Utc) {
$dt = [datetime]::SpecifyKind($dt, [DateTimeKind]::Utc)
}
} catch {
continue
}
# 时间窗口判断
$inWindow = $false
if ($AssumeUtc) {
if ($dt -ge $windowStartUtc -and $dt -le $windowEndUtc) { $inWindow = $true }
} else {
# 解析为本地时间
if ($dt -ge $windowStartLocal -and $dt -le $windowEndLocal) { $inWindow = $true }
}
if (-not $inWindow) { continue }
$TotalLines++
# 拉取其他字段
$uri = (& $get 'cs-uri-stem'); if ([string]::IsNullOrEmpty($uri)) { $uri = '-' }
$cip = (& $get 'c-ip'); if ([string]::IsNullOrEmpty($cip)) { $cip = '-' }
$status = (& $get 'sc-status'); if ([string]::IsNullOrEmpty($status)) { $status = '0' }
$tt = (& $get 'time-taken'); if ([string]::IsNullOrEmpty($tt)) { $tt = '0' }
# 转型
$statusInt = 0
[void][int]::TryParse($status, [ref]$statusInt) | Out-Null
$ttInt = 0
[void][int]::TryParse($tt, [ref]$ttInt) | Out-Null
# 过滤条件
if (($statusInt -ge $StatusThreshold) -or ($ttInt -ge $TimeTakenMsThreshold)) {
$MatchedLines++
$obj = [pscustomobject]@{
server = $ServerName
file = $FilePath
timestamp = if ($AssumeUtc) { $dt.ToLocalTime().ToString('yyyy-MM-dd HH:mm:ss') + ' (Local)' } else { $dt.ToString('yyyy-MM-dd HH:mm:ss') }
date = $dateStr
time = $timeStr
'cs-uri-stem' = $uri
'c-ip' = $cip
'sc-status' = $statusInt
'time-taken-ms' = $ttInt
'cs-method' = (& $get 'cs-method')
'cs-host' = (& $get 'cs-host')
'sc-substatus' = (& $get 'sc-substatus')
'sc-win32-status'= (& $get 'sc-win32-status')
'cs(Referer)' = (& $get 'cs(Referer)')
'cs(User-Agent)' = (& $get 'cs(User-Agent)')
}
$Details.Add($obj) | Out-Null
# 聚合
$key = "$uri|$statusInt|$cip"
if (-not $Aggregations.ContainsKey($key)) {
$Aggregations[$key] = [ordered]@{
'cs-uri-stem' = $uri
'sc-status' = $statusInt
'c-ip' = $cip
Count = 0
SumTime = 0
MaxTime = 0
}
}
$agg = $Aggregations[$key]
$agg['Count']++
$agg['SumTime'] += $ttInt
if ($ttInt -gt $agg['MaxTime']) { $agg['MaxTime'] = $ttInt }
}
}
} catch {
Write-Log "解析文件异常:$FilePath => $($_.Exception.Message)" "WARN"
} finally {
$reader.Close()
$reader.Dispose()
}
}
}
process {
# 收集候选文件(优化:只看近LookbackDaysForFiles修改过的)
$since = (Get-Date).AddDays(-$LookbackDaysForFiles)
$candidateFiles = @()
foreach ($dir in $LogDirectories) {
if (-not (Test-Path $dir)) {
Write-Log "日志目录不存在(跳过):$dir" "WARN"
continue
}
# 原始log
$candidateFiles += Get-ChildItem -LiteralPath $dir -File -Include *.log -ErrorAction SilentlyContinue | Where-Object { $_.LastWriteTime -ge $since }
# 压缩包
$archives = Get-ChildItem -LiteralPath $dir -File -Include *.zip, *.gz, *.gzip -ErrorAction SilentlyContinue | Where-Object { $_.LastWriteTime -ge $since }
foreach ($a in $archives) {
try {
$expanded = Expand-LogArchive -ArchivePath $a.FullName -DestRoot $SessionTempDir
if ($expanded.Count -gt 0) { $candidateFiles += $expanded }
Write-Log "解压:$($a.FullName) => $($expanded.Count) 个.log"
} catch {
Write-Log "解压失败:$($a.FullName) => $($_.Exception.Message)" "WARN"
}
}
}
$candidateFiles = $candidateFiles | Select-Object -Unique
Write-Log "候选日志文件数:$($candidateFiles.Count)"
foreach ($f in $candidateFiles) {
try {
Parse-W3C-File -FilePath $f -AssumeUtc:$AssumeUtc
$FilesProcessed += $f
} catch {
Write-Log "文件处理异常:$f => $($_.Exception.Message)" "WARN"
}
}
# 生成结果
$detailCsv = Join-Path $OutputDir "details_$($todayTag).csv"
$summaryCsv = Join-Path $OutputDir "summary_top$TopN`_$($todayTag).csv"
$reportTxt = Join-Path $OutputDir "report_$($todayTag).txt"
# 明细导出
$Details | Export-Csv -NoTypeInformation -Path $detailCsv -Encoding UTF8
# 聚合并TopN
$Summary = foreach ($kv in $Aggregations.GetEnumerator()) {
$val = $kv.Value
[pscustomobject]@{
'cs-uri-stem' = $val['cs-uri-stem']
'sc-status' = $val['sc-status']
'c-ip' = $val['c-ip']
Count = $val['Count']
AvgTimeTaken_ms = [math]::Round(($val['SumTime'] / [Math]::Max(1,$val['Count'])),2)
MaxTimeTaken_ms = $val['MaxTime']
}
}
$TopSummary = $Summary | Sort-Object -Property @{Expression='Count';Descending=$true}, @{Expression='AvgTimeTaken_ms';Descending=$true} | Select-Object -First $TopN
$TopSummary | Export-Csv -NoTypeInformation -Path $summaryCsv -Encoding UTF8
# 生成报告
$totalFiles = $FilesProcessed.Count
$totalCandidates = $candidateFiles.Count
$distinctUris = ($Details | Select-Object -ExpandProperty 'cs-uri-stem' -Unique | Measure-Object).Count
$distinctIPs = ($Details | Select-Object -ExpandProperty 'c-ip' -Unique | Measure-Object).Count
$timeWindowDesc = if ($AssumeUtc) {
"UTC: $($windowStartUtc.ToString('yyyy-MM-dd HH:mm:ss')) ~ $($windowEndUtc.ToString('yyyy-MM-dd HH:mm:ss'))(以本地显示:近${TimeWindowHours}小时)"
} else {
"Local: $($windowStartLocal.ToString('yyyy-MM-dd HH:mm:ss')) ~ $($windowEndLocal.ToString('yyyy-MM-dd HH:mm:ss'))(近${TimeWindowHours}小时)"
}
$topPreview = ($TopSummary | Select-Object -First ([Math]::Min(10,$TopSummary.Count)))
$topLines = @("Top项预览(最多10条):")
foreach ($t in $topPreview) {
$topLines += (" - {0} | sc-status={1} | c-ip={2} | Count={3} | Avg={4}ms | Max={5}ms" -f $t.'cs-uri-stem', $t.'sc-status', $t.'c-ip', $t.Count, $t.AvgTimeTaken_ms, $t.MaxTimeTaken_ms)
}
@"
IIS 日志分析报告
================
服务器:$ServerName
时间窗口:$timeWindowDesc
日志目录:$(($LogDirectories -join '; '))
候选文件数:$totalCandidates(实际处理:$totalFiles)
扫描总行数:$TotalLines
匹配行数(sc-status>=$StatusThreshold 或 time-taken>=$TimeTakenMsThreshold):$MatchedLines
Distinct URI:$distinctUris
Distinct IP:$distinctIPs
导出文件:
- 明细CSV:$detailCsv
- Top$TopN 摘要CSV:$summaryCsv
$($topLines -join "`r`n")
"@ | Set-Content -Path $reportTxt -Encoding UTF8
Write-Log "明细已导出:$detailCsv"
Write-Log "摘要已导出:$summaryCsv"
Write-Log "报告已生成:$reportTxt"
# 打包ZIP
$zipPath = "$OutputDir.zip"
if (Test-Path $zipPath) { Remove-Item -LiteralPath $zipPath -Force }
Compress-Archive -Path (Join-Path $OutputDir '*') -DestinationPath $zipPath -Force
Write-Log "结果已打包:$zipPath"
# 清理历史(仅OutputRoot下)
if ($EnforceRetention -and (Test-Path $OutputRoot)) {
$deadline = (Get-Date).AddDays(-$RetentionDays)
$toDelete = @()
$toDelete += Get-ChildItem -LiteralPath $OutputRoot -Directory -ErrorAction SilentlyContinue | Where-Object { $_.LastWriteTime -lt $deadline }
$toDelete += Get-ChildItem -LiteralPath $OutputRoot -File -Filter '*.zip' -ErrorAction SilentlyContinue | Where-Object { $_.LastWriteTime -lt $deadline }
foreach ($item in $toDelete) {
try {
Remove-Item -LiteralPath $item.FullName -Recurse -Force -ErrorAction Stop
Write-Log "已清理历史产物:$($item.FullName)"
} catch {
Write-Log "清理失败:$($item.FullName) => $($_.Exception.Message)" "WARN"
}
}
}
Write-Log "任务完成。"
}
end {
# 清理会话临时解压目录(不影响原日志)
try {
if (Test-Path $SessionTempDir) {
Remove-Item -LiteralPath $SessionTempDir -Recurse -Force -ErrorAction Stop
Write-Log "会话临时目录已清理:$SessionTempDir"
}
} catch {
Write-Log "清理临时目录失败(可忽略):$($_.Exception.Message)" "WARN"
}
}
LogDirectories
OutputRoot
TimeWindowHours
StatusThreshold
TimeTakenMsThreshold
TopN
AssumeUtc
TempExtractRoot
LookbackDaysForFiles
DefaultEncodingName
RetentionDays
EnforceRetention
指定本地时间模式(若IIS日志配置使用本地时间) powershell -ExecutionPolicy Bypass -File .\IIS_Log_Analyzer.ps1 -AssumeUtc:$false -TimeWindowHours 24
每小时计划任务示例(从当前脚本路径每小时运行一次) $action = New-ScheduledTaskAction -Execute 'powershell.exe' -Argument '-NoProfile -ExecutionPolicy Bypass -File "D:\ops\scripts\IIS_Log_Analyzer.ps1"' $trigger = New-ScheduledTaskTrigger -Once -At (Get-Date).AddMinutes(2) -RepetitionInterval (New-TimeSpan -Hours 1) -RepetitionDuration ([TimeSpan]::MaxValue) Register-ScheduledTask -TaskName 'IIS_Log_Analyzer_Hourly' -Action $action -Trigger $trigger -Description 'IIS日志分析每小时执行' -User 'SYSTEM' -RunLevel Limited
注:若需自定义输出目录或日志目录,可在Argument中加入 -OutputRoot 与 -LogDirectories 参数。
执行前的准备
安全与合规
兼容性与编码
性能影响与资源消耗
结果保留与清理
如需将服务器名固定为 srv-gw01 或 srv-gw02 可在报告汇总时使用 -ServerName 自行扩展(脚本当前自动读取本机计算机名)。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
op_backup.py - macOS 构建机增量/全量项目级备份与恢复工具
主要特性:
- 每周一次全量(自动判断距上次全量 >= 6 天或 --force-full),其余为增量
- 备份对象:/Users/build/Projects 下每个项目目录(一级子目录)与 /Users/build/.ssh
- 排除:**/DerivedData/**, **/Library/Caches/**, **/*.tmp, **/node_modules/**
- 备份输出:per-project tar.gz;写入 sha256 并立即校验
- 快照保留:最近 N(7) 天
- 恢复:指定日期(YYYYMMDD);dry-run;可保留权限与时间戳;处理删除集
- SMB 凭证:env BACKUP_USER/BACKUP_PASS;通过 AppleScript 安全地从 stdin 传参挂载
- 钩子:备份前停止 Agent,完成后恢复(支持 --agent-label 或自定义命令)
- 日志:/var/log/op_backup.log;失败重试 2 次(总 3 次尝试)
- 定时:支持安装 launchd plist(每日 02:30)
安全说明:
- 不在脚本中硬编码任何敏感信息;凭证从环境变量或外部 env 文件加载
- 删除与清理仅在目标备份根目录下进行,路径校验防误删
"""
import argparse
import datetime as dt
import fnmatch
import hashlib
import json
import logging
import os
import pathlib
import platform
import shutil
import socket
import subprocess
import sys
import tarfile
import tempfile
import time
from typing import Dict, List, Tuple, Iterable, Optional
# =========================
# 全局默认配置(可通过参数覆盖)
# =========================
DEFAULT_SOURCES = [
"/Users/build/Projects",
"/Users/build/.ssh",
]
DEFAULT_EXCLUDES = [
"**/DerivedData/**",
"**/Library/Caches/**",
"**/*.tmp",
"**/node_modules/**",
]
DEFAULT_SMB_URL = "smb://10.0.30.20/backup/build"
DEFAULT_MOUNT_POINT = "/Volumes/build_backup"
DEFAULT_LOG_PATH = "/var/log/op_backup.log"
DEFAULT_RETENTION_DAYS = 7
DEFAULT_FULL_INTERVAL_DAYS = 7 # 每周一次全量(≥6天未全量则全量)
DEFAULT_RETRIES = 2 # 失败重试次数
SCRIPT_LABEL = "com.ops.backup"
# =========================
# 日志
# =========================
def setup_logger(log_path: str) -> logging.Logger:
logger = logging.getLogger("op_backup")
logger.setLevel(logging.INFO)
fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
# 文件日志
try:
fh = logging.FileHandler(log_path)
fh.setFormatter(fmt)
fh.setLevel(logging.INFO)
logger.addHandler(fh)
except Exception:
# 回退到 stdout
sh = logging.StreamHandler(sys.stdout)
sh.setFormatter(fmt)
logger.addHandler(sh)
logger.warning("无法写入日志文件 %s,已回退到标准输出", log_path)
return logger
logger = setup_logger(DEFAULT_LOG_PATH)
# =========================
# 工具函数
# =========================
def retry(max_retries: int, pause_seconds: float = 3.0):
def deco(func):
def wrapper(*args, **kwargs):
attempt = 0
last_exc = None
while attempt <= max_retries:
try:
return func(*args, **kwargs)
except Exception as e:
last_exc = e
attempt += 1
if attempt > max_retries:
logger.error("函数 %s 最终失败:%s", func.__name__, e)
raise
logger.warning("函数 %s 失败(第 %d 次),%ss 后重试... 错误:%s",
func.__name__, attempt, pause_seconds, e)
time.sleep(pause_seconds)
return wrapper
return deco
def load_env_file(env_file: Optional[str]):
if not env_file:
return
p = pathlib.Path(env_file)
if not p.exists():
raise FileNotFoundError(f"env 文件不存在:{env_file}")
for line in p.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
os.environ[k.strip()] = v.strip()
def ensure_dir(path: pathlib.Path):
path.mkdir(parents=True, exist_ok=True)
def sha256_file(path: pathlib.Path, chunk: int = 2 * 1024 * 1024) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
while True:
b = f.read(chunk)
if not b:
break
h.update(b)
return h.hexdigest()
def is_mounted(mount_point: str) -> bool:
try:
out = subprocess.check_output(["mount"], text=True)
for line in out.splitlines():
if f" on {mount_point} " in line and "smbfs" in line:
return True
except Exception:
pass
return False
def safe_join(base: pathlib.Path, rel: str) -> pathlib.Path:
# 防止 路径穿越
final = (base / rel).resolve()
if not str(final).startswith(str(base.resolve())):
raise ValueError(f"越界路径:{rel}")
return final
def date_str(d: Optional[dt.date] = None) -> str:
d = d or dt.date.today()
return d.strftime("%Y%m%d")
def list_subdirs(path: pathlib.Path) -> List[pathlib.Path]:
return [p for p in path.iterdir() if p.is_dir()]
def parse_date_dir_name(name: str) -> Optional[dt.date]:
try:
return dt.datetime.strptime(name, "%Y%m%d").date()
except Exception:
return None
def now_local() -> dt.datetime:
return dt.datetime.now()
def hostname() -> str:
try:
return socket.gethostname()
except Exception:
return platform.node() or "unknown-host"
# =========================
# 排除规则
# =========================
def normalize_rel(path: pathlib.Path, base: pathlib.Path) -> str:
rel = path.relative_to(base).as_posix()
return rel if rel != "." else ""
def should_exclude(rel_path: str, exclude_patterns: List[str]) -> bool:
# rel_path 使用 posix 风格
if rel_path is None:
return False
rel = rel_path if rel_path else ""
# 目录以 / 结尾,以便 **/dir/** 命中
candidates = {rel, rel + "/" if not rel.endswith("/") else rel}
for pat in exclude_patterns:
# 统一使用 posix 风格
if any(fnmatch.fnmatch(c, pat) for c in candidates):
return True
return False
# =========================
# SMB 挂载/卸载
# =========================
class SMBMounter:
def __init__(self, smb_url: str, mount_point: str):
self.smb_url = smb_url
self.mount_point = mount_point
def mount(self, user: Optional[str], password: Optional[str]):
mp = pathlib.Path(self.mount_point)
ensure_dir(mp)
if is_mounted(self.mount_point):
logger.info("SMB 已挂载:%s", self.mount_point)
return
if not user or not password:
raise RuntimeError("挂载 SMB 需要 BACKUP_USER/BACKUP_PASS 环境变量或 --env-file 提供")
# 通过 AppleScript 从 stdin 传参,避免密码出现在命令行
applescript = f'''
on run
mount volume "{self.smb_url}" as user name "{user}" with password "{password}"
end run
'''
try:
logger.info("尝试挂载 SMB:%s -> %s", self.smb_url, self.mount_point)
subprocess.check_call(["osascript", "-"], input=applescript.encode("utf-8"))
# 等待 Finder 挂载到 /Volumes/<ShareName>,然后将其绑定/链接到指定 mount_point
time.sleep(2.0)
except subprocess.CalledProcessError as e:
raise RuntimeError(f"SMB 挂载失败:{e}")
# macOS Finder 通常挂载到 /Volumes/<ShareName>,但我们需要稳定路径:
# 如果用户指定了自定义 mount_point,尝试将其软链接到 Finder 挂载点。
if not is_mounted(self.mount_point):
# 查找最新 smbfs 挂载点
out = subprocess.check_output(["mount"], text=True)
mounted = None
for line in out.splitlines():
if "smbfs on /Volumes/" in line:
mounted = line.split(" on ")[1].split(" (")[0].strip()
break
if mounted:
try:
if mp.exists() and not mp.is_dir():
raise RuntimeError(f"mount_point 不是目录:{mp}")
if mp.exists():
# 若已有同名目录,清空后重建软链接
if mp.is_symlink() or mp.is_dir():
try:
os.unlink(mp) if mp.is_symlink() else None
except Exception:
pass
ensure_dir(mp.parent)
if not mp.exists():
os.symlink(mounted, mp)
logger.info("SMB 已通过符号链接映射到 %s -> %s", mp, mounted)
except Exception as e:
logger.warning("创建符号链接失败:%s", e)
if not is_mounted(self.mount_point):
logger.warning("无法确认 %s 已挂载,但 Finder 可能已挂载成功。", self.mount_point)
def unmount(self):
if not is_mounted(self.mount_point):
return
try:
logger.info("卸载 SMB:%s", self.mount_point)
subprocess.check_call(["diskutil", "unmount", "force", self.mount_point])
except Exception as e:
logger.warning("卸载失败(忽略):%s", e)
# =========================
# 备份/恢复核心
# =========================
class BackupManager:
def __init__(
self,
sources: List[str],
exclude_patterns: List[str],
mount_point: str,
smb_url: str,
retention_days: int,
full_interval_days: int,
agent_label: Optional[str] = None,
pre_stop_cmd: Optional[str] = None,
post_start_cmd: Optional[str] = None,
host_name: Optional[str] = None,
):
self.sources = [pathlib.Path(s).resolve() for s in sources]
self.exclude_patterns = exclude_patterns
self.mount_point = pathlib.Path(mount_point)
self.smb_url = smb_url
self.retention_days = retention_days
self.full_interval_days = full_interval_days
self.agent_label = agent_label
self.pre_stop_cmd = pre_stop_cmd
self.post_start_cmd = post_start_cmd
self.host_name = host_name or hostname()
# 备份目录结构:
# /Volumes/build_backup/<host>/
# snapshots/YYYYMMDD/
# TYPE: FULL | INCR
# <project>-<type>-YYYYMMDD.tar.gz
# <project>-<type>-YYYYMMDD.tar.gz.sha256
# deletions-<project>-YYYYMMDD.txt (增量时可能存在)
# meta/manifests/<project>.json (最新清单)
self.base_dir = self.mount_point / self.host_name
self.snapshots_dir = self.base_dir / "snapshots"
self.meta_dir = self.base_dir / "meta" / "manifests"
def _project_list(self) -> List[Tuple[str, pathlib.Path]]:
projects: List[Tuple[str, pathlib.Path]] = []
for src in self.sources:
if not src.exists():
logger.warning("源目录不存在:%s(跳过)", src)
continue
if src.name == ".ssh" and src.is_dir():
projects.append(("ssh", src))
continue
if src.name == "Projects" and src.is_dir():
for child in sorted(src.iterdir()):
if child.is_dir():
projects.append((child.name, child))
else:
# 其它单独目录作为一个项目
projects.append((src.name, src))
return projects
def _load_manifest(self, project: str) -> Dict:
p = self.meta_dir / f"{project}.json"
if not p.exists():
return {"project": project, "files": {}, "timestamp": ""}
try:
return json.loads(p.read_text(encoding="utf-8"))
except Exception:
logger.warning("读取 manifest 失败:%s(重置为空)", p)
return {"project": project, "files": {}, "timestamp": ""}
def _save_manifest(self, project: str, manifest: Dict):
ensure_dir(self.meta_dir)
p = self.meta_dir / f"{project}.json"
p.write_text(json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8")
def _walk_files(self, base: pathlib.Path) -> Iterable[Tuple[str, os.stat_result]]:
# 遍历文件并应用排除规则;返回 (rel_path, stat)
for root, dirs, files in os.walk(base, followlinks=False):
root_p = pathlib.Path(root)
# 目录级排除(修改 dirs 就能剪枝)
keep_dirs = []
for d in dirs:
rel = normalize_rel(root_p / d, base)
if should_exclude(rel + "/", self.exclude_patterns):
continue
keep_dirs.append(d)
dirs[:] = keep_dirs
# 文件
for f in files:
p = root_p / f
rel = normalize_rel(p, base)
if should_exclude(rel, self.exclude_patterns):
continue
try:
st = p.lstat()
except FileNotFoundError:
continue
yield rel, st
def _scan_current_state(self, base: pathlib.Path) -> Dict[str, Dict]:
state: Dict[str, Dict] = {}
for rel, st in self._walk_files(base):
state[rel] = {"mtime": st.st_mtime, "size": st.st_size}
return state
def _compute_delta(
self, prev: Dict[str, Dict], current: Dict[str, Dict]
) -> Tuple[List[str], List[str]]:
changed: List[str] = []
for rel, meta in current.items():
pv = prev.get(rel)
if not pv or pv.get("mtime") != meta["mtime"] or pv.get("size") != meta["size"]:
changed.append(rel)
deletions: List[str] = []
for rel in prev:
if rel not in current:
# 只有当此前文件不属于排除时才记录删除(避免因排除导致虚假删除)
if not should_exclude(rel, self.exclude_patterns):
deletions.append(rel)
return sorted(changed), sorted(deletions)
def _need_full(self) -> bool:
# 若最近一次 FULL 距今 >= full_interval_days-1 则做全量;否则增量
if not self.snapshots_dir.exists():
return True
latest_full_date: Optional[dt.date] = None
for d in list_subdirs(self.snapshots_dir):
ds = parse_date_dir_name(d.name)
if not ds:
continue
type_file = d / "TYPE"
if type_file.exists():
try:
t = type_file.read_text(encoding="utf-8").strip().upper()
if t == "FULL":
if (latest_full_date is None) or (ds > latest_full_date):
latest_full_date = ds
except Exception:
continue
if latest_full_date is None:
return True
delta_days = (dt.date.today() - latest_full_date).days
return delta_days >= (self.full_interval_days - 1)
def _snapshot_dir(self, day: dt.date) -> pathlib.Path:
return self.snapshots_dir / day.strftime("%Y%m%d")
def _write_type_marker(self, snap_dir: pathlib.Path, snap_type: str):
(snap_dir / "TYPE").write_text(snap_type.upper(), encoding="utf-8")
def _archive_project(
self,
project: str,
base_dir: pathlib.Path,
rel_list: List[str],
snap_dir: pathlib.Path,
snap_type: str,
day: dt.date,
) -> Tuple[pathlib.Path, pathlib.Path]:
ensure_dir(snap_dir)
tar_name = f"{project}-{snap_type.lower()}-{day.strftime('%Y%m%d')}.tar.gz"
tar_path = snap_dir / tar_name
sha_path = snap_dir / f"{tar_name}.sha256"
logger.info("打包 %s(%s,%d 个条目)-> %s", project, snap_type, len(rel_list), tar_path)
# 创建 tar.gz
with tarfile.open(tar_path, mode="w:gz", format=tarfile.PAX_FORMAT, compresslevel=6) as tf:
for rel in rel_list:
abs_path = safe_join(base_dir, rel)
# 加入档案,保留相对路径结构
try:
tf.add(abs_path, arcname=rel, recursive=False)
except FileNotFoundError:
# 避免并发写导致的短暂缺失
logger.warning("文件消失(忽略):%s", abs_path)
# 计算 sha256 并写入
digest = sha256_file(tar_path)
sha_path.write_text(f"{digest} {tar_name}\n", encoding="utf-8")
# 立刻重新校验一次
digest2 = sha256_file(tar_path)
if digest != digest2:
raise RuntimeError(f"SHA256 校验不一致:{tar_path}")
logger.info("归档与校验完成:%s (sha256=%s)", tar_path.name, digest)
return tar_path, sha_path
def _write_deletions(
self, project: str, deletions: List[str], snap_dir: pathlib.Path, day: dt.date
) -> Optional[pathlib.Path]:
if not deletions:
return None
path = snap_dir / f"deletions-{project}-{day.strftime('%Y%m%d')}.txt"
path.write_text("\n".join(deletions) + "\n", encoding="utf-8")
logger.info("记录删除集:%s(%d 条)", path.name, len(deletions))
return path
def _run_hook_stop(self):
if self.pre_stop_cmd:
logger.info("执行预停止命令:%s", self.pre_stop_cmd)
subprocess.check_call(self.pre_stop_cmd, shell=True)
elif self.agent_label:
# 尝试停止构建 Agent(可能需 root)
logger.info("停止构建 Agent(launchctl stop):%s", self.agent_label)
subprocess.call(["launchctl", "stop", self.agent_label])
else:
logger.info("未配置停止钩子,跳过")
def _run_hook_start(self):
if self.post_start_cmd:
logger.info("执行恢复命令:%s", self.post_start_cmd)
subprocess.check_call(self.post_start_cmd, shell=True)
elif self.agent_label:
logger.info("恢复构建 Agent(launchctl start):%s", self.agent_label)
subprocess.call(["launchctl", "start", self.agent_label])
else:
logger.info("未配置恢复钩子,跳过")
def _cleanup_retention(self):
if not self.snapshots_dir.exists():
return
entries = list_subdirs(self.snapshots_dir)
items: List[Tuple[dt.date, pathlib.Path]] = []
for d in entries:
ds = parse_date_dir_name(d.name)
if ds:
items.append((ds, d))
items.sort()
# 保留最近 retention_days 天
keep_after = (dt.date.today() - dt.timedelta(days=self.retention_days - 1))
removed = 0
for ds, d in items:
if ds < keep_after:
logger.info("清理超期快照:%s", d)
shutil.rmtree(d, ignore_errors=True)
removed += 1
if removed:
logger.info("保留策略执行完成,删除 %d 个快照目录", removed)
@retry(max_retries=DEFAULT_RETRIES, pause_seconds=3.0)
def backup_once(self, force_full: bool = False):
# 确保目标目录存在
ensure_dir(self.snapshots_dir)
ensure_dir(self.meta_dir)
# 挂载 SMB
mounter = SMBMounter(self.smb_url, str(self.mount_point))
mounter.mount(os.environ.get("BACKUP_USER"), os.environ.get("BACKUP_PASS"))
today = dt.date.today()
snap_dir = self._snapshot_dir(today)
ensure_dir(snap_dir)
snap_type = "FULL" if force_full or self._need_full() else "INCR"
self._write_type_marker(snap_dir, snap_type)
logger.info("开始备份(%s):%s", snap_type, today.strftime("%Y-%m-%d"))
# 钩子:停止 Agent
try:
self._run_hook_stop()
except Exception as e:
logger.warning("停止钩子执行失败(继续):%s", e)
try:
# 遍历项目
for project, base in self._project_list():
prev_manifest = self._load_manifest(project)
current_state = self._scan_current_state(base)
if snap_type == "FULL":
changed = sorted(current_state.keys())
deletions: List[str] = []
else:
changed, deletions = self._compute_delta(prev_manifest.get("files", {}), current_state)
# 即便 changed 为空,也创建一个最小 tar 以标记项目(可选)
tar_path, sha_path = self._archive_project(project, base, changed, snap_dir, snap_type, today)
# 写删除集(仅增量)
self._write_deletions(project, deletions, snap_dir, today)
# 更新 manifest(成为下一次基线)
new_manifest = {
"project": project,
"timestamp": now_local().isoformat(),
"files": current_state,
}
self._save_manifest(project, new_manifest)
# 快照中也保存一份项目 manifest(可选,辅助定位)
(snap_dir / f"{project}-manifest.json").write_text(
json.dumps(new_manifest, ensure_ascii=False, indent=2),
encoding="utf-8",
)
finally:
# 钩子:恢复 Agent
try:
self._run_hook_start()
except Exception as e:
logger.warning("恢复钩子执行失败:%s", e)
# 不强制卸载,留给系统/调度使用;如需可以在外层调用 unmount
# 保留策略清理
self._cleanup_retention()
logger.info("备份完成(%s):%s", snap_type, today.strftime("%Y-%m-%d"))
def list_snapshots(self) -> List[Tuple[str, str]]:
res: List[Tuple[str, str]] = []
if not self.snapshots_dir.exists():
return res
for d in sorted(list_subdirs(self.snapshots_dir)):
t = (d / "TYPE").read_text(encoding="utf-8").strip().upper() if (d / "TYPE").exists() else "UNKNOWN"
res.append((d.name, t))
return res
def _locate_restore_chain(self, target_date: dt.date) -> List[Tuple[dt.date, pathlib.Path, str]]:
# 找到 <= target_date 的快照链:最近的 FULL 及之后直到 target_date 的所有快照
items: List[Tuple[dt.date, pathlib.Path, str]] = []
for d in list_subdirs(self.snapshots_dir):
ds = parse_date_dir_name(d.name)
if not ds or ds > target_date:
continue
t = (d / "TYPE").read_text(encoding="utf-8").strip().upper() if (d / "TYPE").exists() else "UNKNOWN"
items.append((ds, d, t))
items.sort()
# 找最近的 FULL
last_full_idx = -1
for i in range(len(items) - 1, -1, -1):
if items[i][2] == "FULL":
last_full_idx = i
break
if last_full_idx == -1:
raise RuntimeError("未找到可用的 FULL 快照")
chain = items[last_full_idx:]
return chain
def _list_project_archives(self, snap_dir: pathlib.Path, project: str) -> List[pathlib.Path]:
return sorted(snap_dir.glob(f"{project}-*.tar.gz"))
def _read_deletions(self, snap_dir: pathlib.Path, project: str) -> List[str]:
res: List[str] = []
for p in snap_dir.glob(f"deletions-{project}-*.txt"):
try:
res += [line.strip() for line in p.read_text(encoding="utf-8").splitlines() if line.strip()]
except Exception:
continue
return res
def _safe_extract(self, tf: tarfile.TarFile, dest: pathlib.Path, preserve_perms: bool):
# 安全解压,防止路径穿越
for m in tf.getmembers():
# 仅允许相对路径
if m.name.startswith("/") or ".." in pathlib.PurePosixPath(m.name).parts:
logger.warning("检测到可疑条目(忽略):%s", m.name)
continue
target = dest / m.name
target_resolved = target.resolve().absolute()
if not str(target_resolved).startswith(str(dest.resolve().absolute())):
logger.warning("越界解压(忽略):%s", m.name)
continue
# 解压
tf.extract(m, path=dest)
if preserve_perms and hasattr(os, "chmod"):
try:
if m.mode is not None:
os.chmod(target_resolved, m.mode)
# 尝试保留 mtime
if m.mtime is not None:
os.utime(target_resolved, (m.mtime, m.mtime), follow_symlinks=False)
except Exception:
pass
@retry(max_retries=DEFAULT_RETRIES, pause_seconds=3.0)
def restore(
self,
target_yyyymmdd: str,
dest_override: Optional[str],
projects: Optional[List[str]],
dry_run: bool,
preserve_perms: bool = True,
):
mounter = SMBMounter(self.smb_url, str(self.mount_point))
mounter.mount(os.environ.get("BACKUP_USER"), os.environ.get("BACKUP_PASS"))
target_date = dt.datetime.strptime(target_yyyymmdd, "%Y%m%d").date()
chain = self._locate_restore_chain(target_date)
logger.info("恢复目标日期:%s;包含 %d 个快照", target_yyyymmdd, len(chain))
# 选择项目
all_projects = dict(self._project_list())
selected: List[Tuple[str, pathlib.Path]] = []
if projects:
for p in projects:
base = all_projects.get(p)
if not base:
raise RuntimeError(f"未知项目:{p}")
selected.append((p, base))
else:
selected = list(all_projects.items())
for project, base in selected:
dest_base = pathlib.Path(dest_override).resolve() if dest_override else base
ensure_dir(dest_base)
logger.info("恢复项目:%s -> %s", project, dest_base)
# 依序应用 FULL + INCR
archives: List[pathlib.Path] = []
deletions_accum: List[str] = []
for ds, d, t in chain:
for arc in self._list_project_archives(d, project):
archives.append(arc)
# 收集删除集
deletions_accum += self._read_deletions(d, project)
if dry_run:
logger.info("[dry-run] 将解压 %d 个归档:%s", len(archives), ", ".join(a.name for a in archives))
if deletions_accum:
logger.info("[dry-run] 将删除 %d 个文件(若存在)", len(deletions_accum))
continue
# 解压归档
for arc in archives:
logger.info("解压:%s", arc)
with tarfile.open(arc, "r:gz") as tf:
self._safe_extract(tf, dest_base, preserve_perms)
# 应用删除集
unique_deletions = sorted(set(deletions_accum))
removed = 0
for rel in unique_deletions:
target = dest_base / rel
try:
if target.is_file() or target.is_symlink():
target.unlink(missing_ok=True)
removed += 1
elif target.is_dir():
shutil.rmtree(target, ignore_errors=True)
removed += 1
except Exception:
pass
if unique_deletions:
logger.info("应用删除集完成:删除 %d", removed)
logger.info("恢复完成:%s", target_yyyymmdd)
def verify(self, target_yyyymmdd: Optional[str]):
mounter = SMBMounter(self.smb_url, str(self.mount_point))
mounter.mount(os.environ.get("BACKUP_USER"), os.environ.get("BACKUP_PASS"))
# 选择快照
if target_yyyymmdd:
snap_dir = self.snapshots_dir / target_yyyymmdd
if not snap_dir.exists():
raise RuntimeError(f"快照不存在:{snap_dir}")
else:
snapshots = sorted(list_subdirs(self.snapshots_dir))
if not snapshots:
raise RuntimeError("无可用快照")
snap_dir = snapshots[-1]
logger.info("校验快照:%s", snap_dir.name)
for sha in snap_dir.glob("*.sha256"):
line = sha.read_text(encoding="utf-8").strip()
if " " in line:
expect, name = line.split(" ", 1)
else:
parts = line.split()
expect = parts[0]
name = parts[-1]
arc = snap_dir / name
if not arc.exists():
logger.error("缺少归档:%s", arc)
continue
got = sha256_file(arc)
if got != expect:
raise RuntimeError(f"校验失败:{arc} (期望 {expect},实际 {got})")
logger.info("OK: %s", name)
logger.info("校验完成:%s", snap_dir.name)
def install_schedule(self, label: str, run_time: str, env_file: Optional[str], script_path: str):
# 生成 launchd plist(/Library/LaunchDaemons/)
hh, mm = run_time.split(":")
plist = f"""<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key><string>{label}</string>
<key>ProgramArguments</key>
<array>
<string>{sys.executable}</string>
<string>{script_path}</string>
<string>backup</string>
</array>
<key>StartCalendarInterval</key>
<dict>
<key>Hour</key><integer>{int(hh)}</integer>
<key>Minute</key><integer>{int(mm)}</integer>
</dict>
<key>RunAtLoad</key><false/>
<key>StandardOutPath</key><string>{DEFAULT_LOG_PATH}</string>
<key>StandardErrorPath</key><string>{DEFAULT_LOG_PATH}</string>
<key>EnvironmentVariables</key>
<dict>
<!-- 若使用 env_file,请在外部管理并由 launchd 的 wrapper 加载;这里通常留空 -->
</dict>
</dict>
</plist>
"""
plist_path = f"/Library/LaunchDaemons/{label}.plist"
pathlib.Path(plist_path).write_text(plist, encoding="utf-8")
logger.info("已写入 launchd plist:%s", plist_path)
# 加载
subprocess.check_call(["launchctl", "load", "-w", plist_path])
logger.info("已加载定时任务(每日 %s)", run_time)
# =========================
# CLI
# =========================
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(description="macOS 项目级增量/全量备份与恢复工具")
sub = p.add_subparsers(dest="cmd", required=True)
common = argparse.ArgumentParser(add_help=False)
common.add_argument("--sources", nargs="+", default=DEFAULT_SOURCES, help="源目录(默认:/Users/build/Projects /Users/build/.ssh)")
common.add_argument("--exclude", nargs="+", default=DEFAULT_EXCLUDES, help="排除模式(支持 ** 通配)")
common.add_argument("--smb-url", default=DEFAULT_SMB_URL, help="SMB 目标,如 smb://10.0.30.20/backup/build")
common.add_argument("--mount-point", default=DEFAULT_MOUNT_POINT, help="挂载点")
common.add_argument("--retention-days", type=int, default=DEFAULT_RETENTION_DAYS, help="保留天数(默认7)")
common.add_argument("--full-interval-days", type=int, default=DEFAULT_FULL_INTERVAL_DAYS, help="全量间隔天数(默认7)")
common.add_argument("--agent-label", default=os.environ.get("BUILD_AGENT_LABEL"), help="构建Agent的 launchd Label(可选)")
common.add_argument("--pre-stop-cmd", default=os.environ.get("HOOK_STOP"), help="备份前停止命令(可选)")
common.add_argument("--post-start-cmd", default=os.environ.get("HOOK_START"), help="备份后恢复命令(可选)")
common.add_argument("--env-file", default=os.environ.get("OP_BACKUP_ENV_FILE"), help="包含 BACKUP_USER/BACKUP_PASS 的安全 env 文件路径")
common.add_argument("--host-name", default=None, help="覆盖主机名,用于备份路径分隔")
p_backup = sub.add_parser("backup", parents=[common], help="执行一次备份")
p_backup.add_argument("--force-full", action="store_true", help="强制全量备份")
p_backup.add_argument("--no-mount", action="store_true", help="不自动挂载(假定已挂载)")
p_restore = sub.add_parser("restore", parents=[common], help="恢复到指定日期")
p_restore.add_argument("--date", required=True, help="目标日期 YYYYMMDD")
p_restore.add_argument("--project", nargs="*", help="指定项目(默认全部)")
p_restore.add_argument("--dest", default=None, help="恢复目标根目录(默认恢复到原路径)")
p_restore.add_argument("--dry-run", action="store_true", help="仅预演,不落盘")
p_restore.add_argument("--no-preserve-perms", action="store_true", help="不保留权限/时间戳")
p_list = sub.add_parser("list", parents=[common], help="列出快照")
p_verify = sub.add_parser("verify", parents=[common], help="校验快照 sha256")
p_verify.add_argument("--date", default=None, help="目标日期 YYYYMMDD(默认最新)")
p_sched = sub.add_parser("install-schedule", parents=[common], help="安装每日 02:30 定时任务(launchd)")
p_sched.add_argument("--time", default="02:30", help="HH:MM,默认 02:30")
p_sched.add_argument("--label", default=SCRIPT_LABEL, help="launchd Label")
p_sched.add_argument("--script-path", default=os.path.abspath(sys.argv[0]), help="脚本绝对路径")
return p
def main():
parser = build_parser()
args = parser.parse_args()
# 读取 env 文件(如提供)
if getattr(args, "env_file", None):
load_env_file(args.env_file)
mgr = BackupManager(
sources=args.sources,
exclude_patterns=args.exclude,
mount_point=args.mount_point,
smb_url=args.smb_url,
retention_days=args.retention_days,
full_interval_days=args.full_interval_days,
agent_label=args.agent_label,
pre_stop_cmd=args.pre_stop_cmd,
post_start_cmd=args.post_start_cmd,
host_name=args.host_name,
)
if args.cmd == "backup":
mgr.backup_once(force_full=args.force_full)
elif args.cmd == "restore":
mgr.restore(
target_yyyymmdd=args.date,
dest_override=args.dest,
projects=args.project,
dry_run=args.dry_run,
preserve_perms=not args.no_preserve_perms,
)
elif args.cmd == "list":
snaps = mgr.list_snapshots()
for d, t in snaps:
print(f"{d} {t}")
elif args.cmd == "verify":
mgr.verify(args.date)
elif args.cmd == "install-schedule":
mgr.install_schedule(args.label, args.time, args.env_file, args.script_path)
else:
parser.print_help()
if __name__ == "__main__":
main()
环境变量:
让运维团队在几分钟内从“需求描述”直达“可执行脚本”,覆盖常见场景(如监控告警、日志排障、数据备份与回滚、资源调优、安全巡检),以更少的人力和更低风险完成重复与关键操作。通过标准化、可配置、带注释的脚本生成,显著缩短交付周期,降低误操作概率,提升跨系统环境(Linux/Windows)的稳定性与合规性,最终实现效率提升与成本优化,促进试用转为付费。
利用提示词快速生成监控、日志采集、备份恢复与性能优化脚本;按环境自动选择语言;输出注释与使用指南,提升交付速度并降低误操作。
在发布流水线中一键生成健康检查、滚动备份与资源巡检脚本;结构化日志与错误处理,便于持续集成与问题追踪,缩短故障修复时间。
不必深度编程,即可建立定时备份、磁盘空间告警、服务自检与日志打包脚本;标准化配置参数,快速覆盖Linux与Windows主机。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
半价获取高级提示词-优惠即将到期