智能运维脚本生成专家

15 浏览
9 试用
0 购买
Oct 27, 2025更新

本提示词专为IT运维人员设计,能够根据用户输入的服务器类型、运维任务类型和具体配置参数,自动生成专业、安全、高效的运维脚本。通过深度分析用户需求,结合最佳实践和安全性考量,提供可立即执行的脚本代码,显著提升运维效率,降低人为错误风险,确保系统稳定运行。支持多种主流操作系统和常见运维场景,包括系统监控、日志分析、备份恢复、性能优化等任务。

脚本概述

  • 功能描述:
    通过拉取 node_exporter 的 /metrics 指标,周期性监控 Linux 服务器的 CPU 使用率、内存使用率、磁盘分区使用率,并记录日志。当 CPU 或内存超过设定阈值时自动触发邮件告警。同时采集并记录当前系统中资源占用最高的进程列表(用于定位问题),支持网络请求/告警发送自动重试与失败告警。
  • 适用环境:
    • 操作系统:Linux
    • 依赖:已部署并可访问的 node_exporter(默认 http://127.0.0.1:9100/metrics)
    • 运行环境:Python 3.8+(仅使用标准库)
  • 执行权限:
    • 普通用户即可运行脚本;
    • 如需写入默认日志目录 /var/log/ops,需具备相应写权限(通常为 root 或调整目录权限到运行用户)。

完整代码

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Linux系统监控脚本(基于 node_exporter)
- 指标:CPU、内存、磁盘、进程Top
- 告警:CPU/内存阈值触发邮件通知
- 日志:RotatingFileHandler 至指定目录
- 安全:不包含破坏性操作,不硬编码敏感信息

作者:原创
"""

import argparse
import os
import sys
import time
import ssl
import smtplib
import socket
import signal
import subprocess
import re
import threading
from email.mime.text import MIMEText
from email.utils import formatdate, make_msgid
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
from logging.handlers import RotatingFileHandler
import logging

STOP_EVENT = threading.Event()

DEFAULT_EXCLUDE_FSTYPE = [
    "tmpfs", "devtmpfs", "overlay", "squashfs", "aufs", "ramfs", "iso9660",
    "proc", "sysfs", "cgroup", "cgroup2", "pstore", "debugfs", "bpf", "nsfs"
]
DEFAULT_EXCLUDE_MOUNTPOINT_RE = r'^/(proc|sys|dev|run)($|/)'

def setup_logging(log_dir: str, level: str = "INFO") -> logging.Logger:
    os.makedirs(log_dir, exist_ok=True)
    log_file = os.path.join(log_dir, "ops_monitor.log")
    logger = logging.getLogger("ops_monitor")
    logger.setLevel(getattr(logging, level.upper(), logging.INFO))
    fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
    handler = RotatingFileHandler(log_file, maxBytes=10 * 1024 * 1024, backupCount=5, encoding="utf-8")
    handler.setFormatter(fmt)
    logger.addHandler(handler)
    # 同时输出到控制台(可选)
    console = logging.StreamHandler(sys.stdout)
    console.setFormatter(fmt)
    console.setLevel(getattr(logging, level.upper(), logging.INFO))
    logger.addHandler(console)
    return logger

def graceful_exit(signum, frame):
    STOP_EVENT.set()

def http_get_with_retries(url: str, retries: int, timeout: int, logger: logging.Logger) -> str:
    last_err = None
    for attempt in range(1, retries + 1):
        try:
            req = Request(url, headers={"Accept": "text/plain"})
            with urlopen(req, timeout=timeout) as resp:
                charset = resp.headers.get_content_charset() or "utf-8"
                data = resp.read().decode(charset, errors="replace")
                return data
        except (HTTPError, URLError, TimeoutError) as e:
            last_err = e
            logger.warning("HTTP GET failed (attempt %d/%d): %s", attempt, retries, str(e))
            time.sleep(min(2 * attempt, 10))
    raise RuntimeError(f"Failed to fetch metrics from {url}: {last_err}")

def parse_prometheus_text(text: str):
    """
    解析 Prometheus 文本指标为 dict[name] = list of (labels_dict, value_float)
    仅处理简单的 name{labels} value 或 name value 格式。
    """
    metrics = {}
    for line in text.splitlines():
        line = line.strip()
        if not line or line.startswith("#"):
            continue
        # name{l1="v1",l2="v2"} value  或  name value
        m = re.match(r'^([a-zA-Z_:][a-zA-Z0-9_:]*)(\{[^}]*\})?\s+(-?\d+\.?\d*(?:e[+-]?\d+)?|NaN|Inf|-Inf)\s*$', line)
        if not m:
            # 跳过无法解析的行
            continue
        name, label_str, value_str = m.groups()
        labels = {}
        if label_str:
            # 去掉花括号
            label_content = label_str[1:-1]
            # 简单解析 k="v",允许有逗号分隔
            # 注意:不处理复杂转义场景,仅处理常见格式
            parts = re.findall(r'([a-zA-Z_][a-zA-Z0-9_]*)="((?:\\.|[^"\\])*)"', label_content)
            for k, v in parts:
                labels[k] = bytes(v, "utf-8").decode("unicode_escape")
        try:
            if value_str in ("NaN", "Inf", "-Inf"):
                value = float("nan") if value_str == "NaN" else float(value_str)
            else:
                value = float(value_str)
        except Exception:
            continue
        metrics.setdefault(name, []).append((labels, value))
    return metrics

def compute_cpu_usage(prev_metrics: dict, cur_metrics: dict, logger: logging.Logger) -> float:
    """
    依据 node_cpu_seconds_total 两次快照计算总CPU使用率(百分比)
    使用:usage = 1 - (delta_idle / delta_total)
    """
    def sum_metric(mdict, mode_filter=None):
        total = 0.0
        for labels, val in mdict.get("node_cpu_seconds_total", []):
            if "cpu" in labels and labels.get("cpu") == "cpu":
                # 某些系统可能有聚合CPU标签 'cpu="cpu"', 忽略,以各核心求和
                continue
            if mode_filter is None or labels.get("mode") == mode_filter:
                total += val
        return total

    idle_prev = sum_metric(prev_metrics, mode_filter="idle")
    idle_cur = sum_metric(cur_metrics, mode_filter="idle")
    total_prev = sum_metric(prev_metrics, mode_filter=None)
    total_cur = sum_metric(cur_metrics, mode_filter=None)

    delta_idle = idle_cur - idle_prev
    delta_total = total_cur - total_prev

    if delta_total <= 0:
        logger.debug("CPU delta_total <= 0, skip calculation")
        return float("nan")
    usage = max(0.0, min(100.0, (1.0 - (delta_idle / delta_total)) * 100.0))
    return usage

def compute_memory_usage(cur_metrics: dict) -> float:
    """
    使用:1 - MemAvailable / MemTotal
    """
    avail = None
    total = None
    for labels, val in cur_metrics.get("node_memory_MemAvailable_bytes", []):
        avail = val
        break
    for labels, val in cur_metrics.get("node_memory_MemTotal_bytes", []):
        total = val
        break
    if avail is None or total is None or total <= 0:
        return float("nan")
    usage = max(0.0, min(100.0, (1.0 - (avail / total)) * 100.0))
    return usage

def compute_disk_usages(cur_metrics: dict, exclude_fstypes, exclude_mount_re: str):
    """
    返回每个挂载点的使用率字典:{mountpoint: (usage_percent, device, fstype)}
    使用:1 - avail / size
    """
    size_entries = cur_metrics.get("node_filesystem_size_bytes", [])
    avail_entries = cur_metrics.get("node_filesystem_avail_bytes", [])

    # 构建查找表 (device, fstype, mountpoint) -> value
    def key(lbl):
        return (lbl.get("device", ""), lbl.get("fstype", ""), lbl.get("mountpoint", ""))

    size_map = {key(lbl): val for lbl, val in size_entries}
    avail_map = {key(lbl): val for lbl, val in avail_entries}

    result = {}
    mp_ex_re = re.compile(exclude_mount_re) if exclude_mount_re else None

    for k, size in size_map.items():
        device, fstype, mount = k
        if not mount or not fstype:
            continue
        if fstype in exclude_fstypes:
            continue
        if mp_ex_re and mp_ex_re.search(mount):
            continue
        avail = avail_map.get(k)
        if avail is None or size <= 0:
            continue
        usage = max(0.0, min(100.0, (1.0 - (avail / size)) * 100.0))
        result[mount] = (usage, device, fstype)
    return result

def get_top_processes(n: int, logger: logging.Logger):
    """
    通过 'ps' 获取前N个CPU占用最高的进程及其内存占用
    返回列表:[{'pid':..., 'comm':..., 'cpu':..., 'mem':...}, ...]
    注:此项非 node_exporter 指标,为辅助定位问题。
    """
    try:
        # -eo: 指定列,--sort=-%cpu 按CPU降序
        cmd = ["ps", "-eo", "pid,comm,%cpu,%mem", "--sort=-%cpu"]
        out = subprocess.check_output(cmd, text=True)
        lines = out.strip().splitlines()
        # 跳过标题
        data = []
        for ln in lines[1:n+1]:
            parts = ln.split(None, 3)
            if len(parts) < 4:
                continue
            pid, comm, cpu, mem = parts
            data.append({
                "pid": int(pid),
                "comm": comm,
                "cpu": float(cpu),
                "mem": float(mem),
            })
        return data
    except Exception as e:
        logger.warning("Failed to get top processes: %s", e)
        return []

def send_mail(
    smtp_host: str,
    smtp_port: int,
    use_tls: bool,
    mail_from: str,
    mail_to: list,
    subject: str,
    body: str,
    retries: int,
    smtp_user: str = None,
    smtp_pass: str = None,
    logger: logging.Logger = None
):
    if not mail_to:
        if logger:
            logger.warning("No mail recipients specified; skip sending.")
        return
    msg = MIMEText(body, _charset="utf-8")
    msg["Subject"] = subject
    msg["From"] = mail_from
    msg["To"] = ", ".join(mail_to)
    msg["Date"] = formatdate(localtime=True)
    msg["Message-Id"] = make_msgid()

    last_err = None
    for attempt in range(1, retries + 1):
        try:
            if use_tls:
                context = ssl.create_default_context()
                with smtplib.SMTP(smtp_host, smtp_port, timeout=10) as server:
                    server.ehlo()
                    server.starttls(context=context)
                    server.ehlo()
                    if smtp_user and smtp_pass:
                        server.login(smtp_user, smtp_pass)
                    server.sendmail(mail_from, mail_to, msg.as_string())
            else:
                with smtplib.SMTP(smtp_host, smtp_port, timeout=10) as server:
                    server.ehlo()
                    if smtp_user and smtp_pass:
                        server.login(smtp_user, smtp_pass)
                    server.sendmail(mail_from, mail_to, msg.as_string())
            if logger:
                logger.info("Alert mail sent: %s -> %s", subject, mail_to)
            return
        except Exception as e:
            last_err = e
            if logger:
                logger.warning("Send mail failed (attempt %d/%d): %s", attempt, retries, e)
            time.sleep(min(2 * attempt, 10))
    # 发送失败记录错误
    if logger:
        logger.error("Failed to send alert mail after %d attempts: %s", retries, last_err)

def build_alert_body(hostname: str, cpu_usage: float, mem_usage: float, disks_alert: dict, top_procs: list) -> str:
    lines = []
    lines.append(f"Host: {hostname}")
    lines.append(f"CPU Usage: {cpu_usage:.2f}%")
    lines.append(f"Memory Usage: {mem_usage:.2f}%")
    if disks_alert:
        lines.append("Disk Partitions Exceeded Threshold:")
        for mp, (usage, device, fstype) in disks_alert.items():
            lines.append(f"  - {mp} ({device}, {fstype}): {usage:.2f}%")
    if top_procs:
        lines.append("Top Processes by CPU:")
        for p in top_procs:
            lines.append(f"  PID {p['pid']} {p['comm']} - CPU {p['cpu']:.2f}% MEM {p['mem']:.2f}%")
    lines.append("")
    lines.append("This is an automated message.")
    return "\n".join(lines)

def run_monitor(args):
    logger = setup_logging(args.log_dir, args.log_level)
    hostname = socket.gethostname()

    logger.info("Starting ops_monitor on host: %s", hostname)
    logger.info("Node exporter endpoint: %s", args.node_exporter_url)
    logger.info("Monitoring interval: %ds | thresholds: CPU>%s%%, MEM>%s%%, DISK>%s%%",
                args.interval, args.cpu_threshold, args.mem_threshold, args.disk_threshold)

    # 初次抓取,作为 CPU 计算的前快照
    try:
        prev_text = http_get_with_retries(args.node_exporter_url, args.retry, args.timeout, logger)
        prev_metrics = parse_prometheus_text(prev_text)
        prev_time = time.time()
    except Exception as e:
        logger.error("Initial metrics fetch failed: %s", e)
        # 不终止,等待下一轮
        prev_metrics = None
        prev_time = None

    # 主循环
    while not STOP_EVENT.is_set():
        start_ts = time.time()
        # 睡眠到下次周期
        remaining = args.interval - (start_ts - (prev_time or start_ts))
        sleep_sec = max(1.0, remaining)
        STOP_EVENT.wait(sleep_sec)
        if STOP_EVENT.is_set():
            break

        # 抓取当前指标
        try:
            cur_text = http_get_with_retries(args.node_exporter_url, args.retry, args.timeout, logger)
            cur_metrics = parse_prometheus_text(cur_text)
            cur_time = time.time()
        except Exception as e:
            logger.error("Metrics fetch failed: %s", e)
            # 记录失败告警但不中止
            if args.fail_alert_email:
                send_mail(
                    args.smtp_host, args.smtp_port, args.smtp_tls,
                    args.mail_from, args.mail_to,
                    subject=f"[OPS ALERT] Metrics fetch failed on {hostname}",
                    body=f"Failed to fetch metrics from {args.node_exporter_url}: {e}",
                    retries=args.retry,
                    smtp_user=args.smtp_user, smtp_pass=args.smtp_pass,
                    logger=logger
                )
            continue

        # 计算 CPU(需前后快照)
        cpu_usage = float("nan")
        if prev_metrics:
            cpu_usage = compute_cpu_usage(prev_metrics, cur_metrics, logger)
        else:
            logger.debug("No previous CPU snapshot; will compute from next iteration.")

        # 计算内存
        mem_usage = compute_memory_usage(cur_metrics)

        # 计算磁盘
        disk_usages = compute_disk_usages(
            cur_metrics,
            exclude_fstypes=args.exclude_fstype,
            exclude_mount_re=args.exclude_mount_re
        )
        disks_exceeded = {mp: info for mp, info in disk_usages.items() if info[0] >= args.disk_threshold}

        # 获取Top进程(仅记录,不触发阈值告警)
        top_procs = get_top_processes(args.process_top, logger)

        # 日志记录
        cpu_str = "N/A" if cpu_usage != cpu_usage else f"{cpu_usage:.2f}%"
        mem_str = "N/A" if mem_usage != mem_usage else f"{mem_usage:.2f}%"
        logger.info("CPU=%s MEM=%s", cpu_str, mem_str)
        for mp, (usage, device, fstype) in disk_usages.items():
            logger.info("DISK %s (%s,%s)=%.2f%%", mp, device, fstype, usage)
        if top_procs:
            for p in top_procs:
                logger.info("TOP PID=%d COMM=%s CPU=%.2f%% MEM=%.2f%%",
                            p["pid"], p["comm"], p["cpu"], p["mem"])

        # 告警判断(CPU/内存阈值,磁盘阈值独立)
        should_alert = False
        alert_reasons = []
        if cpu_usage == cpu_usage and cpu_usage >= args.cpu_threshold:
            should_alert = True
            alert_reasons.append(f"CPU>{args.cpu_threshold}% (current {cpu_usage:.2f}%)")
        if mem_usage == mem_usage and mem_usage >= args.mem_threshold:
            should_alert = True
            alert_reasons.append(f"MEM>{args.mem_threshold}% (current {mem_usage:.2f}%)")
        if disks_exceeded:
            should_alert = True
            for mp, (usage, _, _) in disks_exceeded.items():
                alert_reasons.append(f"DISK {mp}>{args.disk_threshold}% (current {usage:.2f}%)")

        if should_alert and args.mail_to:
            subject = f"[OPS ALERT] {hostname}: " + "; ".join(alert_reasons)
            body = build_alert_body(hostname, cpu_usage, mem_usage, disks_exceeded, top_procs)
            send_mail(
                args.smtp_host, args.smtp_port, args.smtp_tls,
                args.mail_from, args.mail_to, subject, body,
                retries=args.retry,
                smtp_user=args.smtp_user, smtp_pass=args.smtp_pass,
                logger=logger
            )

        # 更新前快照用于下一轮CPU计算
        prev_metrics = cur_metrics
        prev_time = cur_time

def parse_args():
    parser = argparse.ArgumentParser(description="Linux 系统监控脚本(基于 node_exporter)")
    parser.add_argument("--node-exporter-url", default="http://127.0.0.1:9100/metrics",
                        help="node_exporter /metrics 地址")
    parser.add_argument("--interval", type=int, default=60, help="监控间隔(秒)")
    parser.add_argument("--cpu-threshold", type=float, default=80.0, help="CPU使用率阈值(百分比)")
    parser.add_argument("--mem-threshold", type=float, default=75.0, help="内存使用率阈值(百分比)")
    parser.add_argument("--disk-threshold", type=float, default=85.0, help="磁盘分区使用率阈值(百分比)")
    parser.add_argument("--process-top", type=int, default=5, help="记录前N个CPU最高的进程数量")
    parser.add_argument("--smtp-host", default=os.environ.get("OPS_SMTP_HOST", "smtp.example.com"),
                        help="SMTP服务器地址(默认读取环境变量 OPS_SMTP_HOST)")
    parser.add_argument("--smtp-port", type=int, default=int(os.environ.get("OPS_SMTP_PORT", "587")),
                        help="SMTP服务器端口(默认读取环境变量 OPS_SMTP_PORT 或 587)")
    parser.add_argument("--smtp-tls", action="store_true", default=True,
                        help="使用STARTTLS(默认启用)")
    parser.add_argument("--smtp-user", default=os.environ.get("OPS_SMTP_USER"),
                        help="SMTP用户名(可使用环境变量 OPS_SMTP_USER)")
    parser.add_argument("--smtp-pass", default=os.environ.get("OPS_SMTP_PASS"),
                        help="SMTP密码(可使用环境变量 OPS_SMTP_PASS)")
    parser.add_argument("--mail-from", default=None,
                        help="告警发件人(默认 ops-monitor@<hostname>)")
    parser.add_argument("--mail-to", nargs="*", default=[],
                        help="告警收件人列表(空则不发送邮件)")
    parser.add_argument("--retry", type=int, default=3, help="自动重试次数(网络/邮件)")
    parser.add_argument("--timeout", type=int, default=10, help="HTTP请求超时(秒)")
    parser.add_argument("--log-dir", default="/var/log/ops", help="日志目录")
    parser.add_argument("--log-level", default="INFO", help="日志级别(DEBUG/INFO/WARN/ERROR)")
    parser.add_argument("--exclude-fstype", nargs="*", default=DEFAULT_EXCLUDE_FSTYPE,
                        help="磁盘过滤的文件系统类型列表(不统计)")
    parser.add_argument("--exclude-mount-re", dest="exclude_mount_re", default=DEFAULT_EXCLUDE_MOUNTPOINT_RE,
                        help="磁盘过滤的挂载点正则(不统计)")
    parser.add_argument("--fail-alert-email", action="store_true", default=False,
                        help="抓取失败时发送故障告警邮件")
    args = parser.parse_args()
    if not args.mail_from:
        args.mail_from = f"ops-monitor@{socket.gethostname()}"
    return args

def main():
    # 信号处理(优雅退出)
    signal.signal(signal.SIGINT, graceful_exit)
    signal.signal(signal.SIGTERM, graceful_exit)
    args = parse_args()
    try:
        run_monitor(args)
    except Exception as e:
            # 顶层兜底日志
            logger = setup_logging(args.log_dir, args.log_level)
            logger.exception("Monitor terminated with error: %s", e)
            sys.exit(1)

if __name__ == "__main__":
    main()

参数说明

  • --node-exporter-url
    node_exporter 的 /metrics 地址,默认 http://127.0.0.1:9100/metrics
  • --interval
    监控采集间隔,单位秒。默认 60
  • --cpu-threshold
    CPU 使用率告警阈值(百分比),默认 80.0
  • --mem-threshold
    内存使用率告警阈值(百分比),默认 75.0
  • --disk-threshold
    磁盘分区使用率告警阈值(百分比),默认 85.0
  • --process-top
    记录前 N 个 CPU 占用最高的进程数量,默认 5(仅日志记录,不触发告警)
  • --smtp-host
    SMTP 服务器地址,默认读取环境变量 OPS_SMTP_HOST,否则使用 smtp.example.com
  • --smtp-port
    SMTP 服务器端口,默认读取环境变量 OPS_SMTP_PORT,否则 587
  • --smtp-tls
    是否使用 STARTTLS,默认启用
  • --smtp-user
    SMTP 用户名,默认读取环境变量 OPS_SMTP_USER(不建议通过参数传递敏感信息)
  • --smtp-pass
    SMTP 密码,默认读取环境变量 OPS_SMTP_PASS(不建议通过参数传递敏感信息)
  • --mail-from
    告警邮件发件人地址,默认 ops-monitor@
  • --mail-to
    告警邮件收件人列表(空列表则不发送邮件)
  • --retry
    网络请求和邮件发送的自动重试次数,默认 3
  • --timeout
    HTTP 请求超时时间(秒),默认 10
  • --log-dir
    日志目录,默认 /var/log/ops
  • --log-level
    日志级别(DEBUG/INFO/WARN/ERROR),默认 INFO
  • --exclude-fstype
    不统计的文件系统类型列表(用于过滤临时/伪文件系统),默认包含 tmpfs/devtmpfs/overlay 等
  • --exclude-mount-re
    不统计的挂载点正则表达式,默认过滤 /proc /sys /dev /run 等
  • --fail-alert-email
    抓取 node_exporter 指标失败时是否发送故障告警邮件,默认 False

使用示例

  1. 按默认配置运行(60秒间隔,阈值:CPU>80%、内存>75%,磁盘>85%),发送告警到指定邮箱:
python3 ops_monitor.py --mail-to ops-alerts@example.com
  1. 指定 node_exporter 地址与自定义日志目录,启用抓取失败告警:
python3 ops_monitor.py \
  --node-exporter-url http://127.0.0.1:9100/metrics \
  --log-dir /var/log/ops \
  --fail-alert-email \
  --mail-to ops-alerts@example.com ops-backup@example.com
  1. 自定义阈值与SMTP参数(通过环境变量传入凭据):
export OPS_SMTP_HOST=smtp.example.com
export OPS_SMTP_PORT=587
export OPS_SMTP_USER=monitor_user
export OPS_SMTP_PASS='your_app_password'
python3 ops_monitor.py \
  --cpu-threshold 85 --mem-threshold 80 --disk-threshold 90 \
  --mail-to ops-alerts@example.com

注意事项

  • 执行前的准备工作
    • 确保 node_exporter 已在本机运行且可访问(默认端口 9100)。可检查:curl http://127.0.0.1:9100/metrics
    • 保障日志目录(默认 /var/log/ops)存在且对运行用户可写:例如 mkdir -p /var/log/ops && chown : /var/log/ops
    • 若需邮件告警,配置 SMTP(主机、端口、TLS、账号与密码);建议通过环境变量传递凭据,避免硬编码。
  • 可能的风险和防范措施
    • 邮件轰炸风险:在持续超阈值情况下会每个周期发送一次告警。可通过外部MTA限流,或在脚本上层(如 systemd timer)调整周期;需要更精细的抑制策略可在后续版本加入告警冷却时间与恢复通知。
    • 指标解析依赖 node_exporter 的标准指标名称;若使用非标准构建或启用了额外采集器,可能导致解析失败或数据偏差。
    • 进程信息使用 ps 命令采集,不依赖 node_exporter;若要求严格仅来自 node_exporter,请关闭该功能或忽略该日志项。
  • 性能影响和资源消耗说明
    • 每个周期进行一次 HTTP 拉取与文本解析,计算量较小;默认间隔60秒对系统资源影响极低。
    • 日志采用大小滚动,默认最大10MB*5个文件;长期运行磁盘占用可控。
    • 进程列表采集使用 ps,开销低;如在超大进程数环境中,可将 --process-top 调小或禁用(设为0则不采集)。

脚本概述

  • 功能描述:从 Windows 事件日志(System、Application)中提取最近24小时内,消息包含“error”或“failed”的事件,按指定时区(UTC+8)计算时间窗口。输出两份 CSV 报表:详细事件列表和按事件ID的汇总统计,并记录运行日志。
  • 适用环境:Windows Server/Windows 10+;PowerShell 5.1 或更高版本;可读取事件日志的账号(建议 Event Log Readers 组或本地管理员)。
  • 执行权限:读取事件日志权限;无需管理员权限(若某些日志访问受限,脚本会跳过并记录告警)。

完整代码

<# 
.SYNOPSIS
  Windows 事件日志关键字分析与按事件ID汇总导出(CSV)

.DESCRIPTION
  - 从指定日志(默认 System, Application)提取最近 N 小时事件,时间窗口基于指定时区(默认 UTC+8)
  - 过滤消息中包含给定关键字正则(默认 (?i)(error|failed))
  - 导出详细事件 CSV 与按事件ID的汇总 CSV
  - 输出运行日志到报告目录

.NOTES
  兼容 PowerShell 5.1+,不使用已弃用的 Get-EventLog;采用 Get-WinEvent。
#>

[CmdletBinding()]
param(
  [Parameter(Mandatory=$false)]
  [string[]]$LogNames = @('System','Application'),

  [Parameter(Mandatory=$false)]
  [ValidateRange(1,720)]
  [int]$LookbackHours = 24,

  [Parameter(Mandatory=$false)]
  [string]$KeywordsPattern = '(?i)(error|failed)',

  [Parameter(Mandatory=$false)]
  [ValidateNotNullOrEmpty()]
  [string]$ReportDir = 'C:\Ops\reports',

  [Parameter(Mandatory=$false)]
  [ValidateNotNullOrEmpty()]
  [string]$TimeZone = 'UTC+8',   # 支持 Windows 时区ID(如 "China Standard Time")或 "UTC±HH[:mm]"

  [Parameter(Mandatory=$false)]
  [ValidateRange(64,100000)]
  [int]$MaxMessageLength = 4096  # 导出时对消息截断长度,避免CSV过大
)

Set-StrictMode -Version Latest
$ErrorActionPreference = 'Stop'

#-------------------- 工具函数 --------------------#
function Write-Log {
  param(
    [Parameter(Mandatory=$true)][string]$Message,
    [ValidateSet("INFO","WARN","ERROR")][string]$Level = "INFO"
  )
  $ts = (Get-Date).ToString("yyyy-MM-dd HH:mm:ss.fff")
  $line = "[{0}] [{1}] {2}" -f $ts, $Level, $Message
  Write-Host $line
  if ($script:LogFile) { Add-Content -Path $script:LogFile -Value $line -Encoding UTF8 }
}

function Get-TimeZoneInfoSafe {
  param([Parameter(Mandatory=$true)][string]$TzSpec)

  # 接受 "Local"
  if ($TzSpec -match '^(?i)local$') {
    return [System.TimeZoneInfo]::Local
  }

  # 先尝试 Windows 时区ID
  try {
    return [System.TimeZoneInfo]::FindSystemTimeZoneById($TzSpec)
  } catch {
    # 解析 "UTC±HH[:mm]" 格式
    if ($TzSpec -match '^\s*UTC\s*([+-])\s*(\d{1,2})(?::?(\d{2}))?\s*$') {
      $sign = $matches[1]
      $h = [int]$matches[2]
      $m = if ($matches[3]) { [int]$matches[3] } else { 0 }
      if ($h -gt 14 -or $m -ge 60) {
        throw "无效的 UTC 偏移:$TzSpec"
      }
      $offset = New-Object System.TimeSpan($h, $m, 0)
      if ($sign -eq '-') {
        $offset = -$offset
      }
      $id = "UTC{0}{1:00}:{2:00}" -f $sign, [math]::Abs($offset.Hours), [math]::Abs($offset.Minutes)
      return [System.TimeZoneInfo]::CreateCustomTimeZone($id, $offset, $id, $id)
    }
    throw "无法识别的时区:'$TzSpec'。请使用 Windows 时区ID(如 'China Standard Time')或 'UTC±HH[:mm]'。"
  }
}

function Get-TimeWindowInLocal {
  param(
    [Parameter(Mandatory=$true)][System.TimeZoneInfo]$TargetTz,
    [Parameter(Mandatory=$true)][int]$Hours
  )
  # 以目标时区的“当前时刻”为基准,计算 [now - Hours, now],再转换到本地时区用于高效过滤
  $nowUtc = [DateTime]::UtcNow
  $nowTz = [System.TimeZoneInfo]::ConvertTimeFromUtc($nowUtc, $TargetTz)
  $startTz = $nowTz.AddHours(-$Hours)
  $endTz   = $nowTz

  $startLocal = [System.TimeZoneInfo]::ConvertTime($startTz, $TargetTz, [System.TimeZoneInfo]::Local)
  $endLocal   = [System.TimeZoneInfo]::ConvertTime($endTz,   $TargetTz, [System.TimeZoneInfo]::Local)

  return [PSCustomObject]@{
    StartLocal = $startLocal
    EndLocal   = $endLocal
    NowTz      = $nowTz
    StartTz    = $startTz
    EndTz      = $endTz
  }
}

function Truncate-String {
  param(
    [AllowNull()][string]$Text,
    [int]$Max = 4096
  )
  if ([string]::IsNullOrEmpty($Text)) { return $Text }
  if ($Text.Length -le $Max) { return $Text }
  return $Text.Substring(0, $Max)
}

#-------------------- 初始化 --------------------#
$script:ExitCode = 0
$created = $false
try {
  if (-not (Test-Path -LiteralPath $ReportDir)) {
    New-Item -ItemType Directory -Force -Path $ReportDir | Out-Null
    $created = $true
  }
} catch {
  Write-Host "无法创建报告目录:$ReportDir"
  throw
}

$stamp    = (Get-Date -Format 'yyyyMMdd_HHmmss')
$DetailCsv = Join-Path $ReportDir ("event_details_{0}.csv" -f $stamp)
$SummaryCsv = Join-Path $ReportDir ("event_summary_byId_{0}.csv" -f $stamp)
$script:LogFile = Join-Path $ReportDir ("run_{0}.log" -f $stamp)

Write-Log "报告目录:$ReportDir(新建:$created)"
Write-Log "详情CSV:$DetailCsv"
Write-Log "汇总CSV:$SummaryCsv"

# 编译正则
try {
  $Regex = [System.Text.RegularExpressions.Regex]::new($KeywordsPattern, [System.Text.RegularExpressions.RegexOptions]::IgnoreCase)
  Write-Log "关键字正则:$KeywordsPattern"
} catch {
  Write-Log "关键字正则无效:$KeywordsPattern" "ERROR"
  throw
}

# 时区与时间窗口
$tz = Get-TimeZoneInfoSafe -TzSpec $TimeZone
$tw = Get-TimeWindowInLocal -TargetTz $tz -Hours $LookbackHours
Write-Log ("时区:{0} | 窗口({1}h):{2} ~ {3}(按时区)" -f $tz.Id, $LookbackHours, $tw.StartTz.ToString('yyyy-MM-dd HH:mm:ss zzz'), $tw.EndTz.ToString('yyyy-MM-dd HH:mm:ss zzz'))

#-------------------- 采集与过滤 --------------------#
$allEvents = New-Object System.Collections.Generic.List[System.Diagnostics.Eventing.Reader.EventRecord]
$failedLogs = @()
foreach ($ln in $LogNames) {
  try {
    Write-Log "读取日志:$ln(本地时间窗:$($tw.StartLocal) ~ $($tw.EndLocal))"
    $part = Get-WinEvent -FilterHashtable @{ LogName = $ln; StartTime = $tw.StartLocal; EndTime = $tw.EndLocal } -ErrorAction Stop
    foreach ($e in $part) { [void]$allEvents.Add($e) }
    Write-Log ("读取完成:{0} 条" -f ($part | Measure-Object | Select-Object -ExpandProperty Count))
  } catch {
    Write-Log ("读取日志失败:{0} | {1}" -f $ln, $_.Exception.Message) "WARN"
    $failedLogs += $ln
    $script:ExitCode = 2
    continue
  }
}

Write-Log ("合计原始事件:{0}" -f $allEvents.Count)

# 消息过滤(关键字)
$filtered = @()
foreach ($rec in $allEvents) {
  try {
    $msg = $rec.Message
    if ([string]::IsNullOrEmpty($msg)) { continue }
    if ($Regex.IsMatch($msg)) { $filtered += $rec }
  } catch {
    # 某些记录在格式化消息时可能异常,忽略该条
    Write-Log ("消息读取异常,已跳过:Log={0}, Id={1}, Time={2}" -f $rec.LogName, $rec.Id, $rec.TimeCreated) "WARN"
    continue
  }
}

Write-Log ("关键字命中事件:{0}" -f $filtered.Count)

#-------------------- 导出详情 --------------------#
$detailRows = $filtered | ForEach-Object {
  $evtLocalTime = $_.TimeCreated
  $evtTzTime = [System.TimeZoneInfo]::ConvertTime($evtLocalTime, [System.TimeZoneInfo]::Local, $tz)
  [PSCustomObject]@{
    EventTime         = $evtTzTime.ToString('yyyy-MM-dd HH:mm:ss zzz')
    TimeZone          = $tz.Id
    LogName           = $_.LogName
    EventId           = $_.Id
    Level             = $_.LevelDisplayName
    Provider          = $_.ProviderName
    Machine           = $_.MachineName
    Keywords          = ($_.KeywordsDisplayNames -join ';')
    Message           = Truncate-String -Text $_.Message -Max $MaxMessageLength
  }
}

$detailRows | Export-Csv -Path $DetailCsv -NoTypeInformation -Encoding UTF8
Write-Log ("详情已导出:{0}" -f $DetailCsv)

#-------------------- 导出汇总(按事件ID) --------------------#
$summaryRows =
  $filtered |
  Group-Object -Property Id |
  Sort-Object -Property Count -Descending |
  ForEach-Object {
    $group = $_.Group
    $firstLocal = $group | Sort-Object TimeCreated | Select-Object -First 1 -ExpandProperty TimeCreated
    $lastLocal  = $group | Sort-Object TimeCreated -Descending | Select-Object -First 1 -ExpandProperty TimeCreated
    $firstTz = [System.TimeZoneInfo]::ConvertTime($firstLocal, [System.TimeZoneInfo]::Local, $tz)
    $lastTz  = [System.TimeZoneInfo]::ConvertTime($lastLocal,  [System.TimeZoneInfo]::Local, $tz)
    [PSCustomObject]@{
      EventId     = $_.Name
      Count       = $_.Count
      LogNames    = ($group | Select-Object -ExpandProperty LogName -Unique | Sort-Object | Out-String).Trim() -replace '\r?\n', '; '
      FirstSeen   = $firstTz.ToString('yyyy-MM-dd HH:mm:ss zzz')
      LastSeen    = $lastTz.ToString('yyyy-MM-dd HH:mm:ss zzz')
      Providers   = ($group | Select-Object -ExpandProperty ProviderName -Unique | Sort-Object | Out-String).Trim() -replace '\r?\n', '; '
      Levels      = ($group | Select-Object -ExpandProperty LevelDisplayName -Unique | Sort-Object | Out-String).Trim() -replace '\r?\n', '; '
    }
  }

$summaryRows | Export-Csv -Path $SummaryCsv -NoTypeInformation -Encoding UTF8
Write-Log ("汇总已导出:{0}" -f $SummaryCsv)

if ($failedLogs.Count -gt 0) {
  Write-Log ("注意:以下日志读取失败且已跳过 -> {0}" -f ($failedLogs -join ', ')) "WARN"
}

Write-Log "完成"
exit $script:ExitCode

参数说明

  • LogNames

    • 含义:要查询的事件日志名称数组
    • 默认值:["System","Application"]
    • 取值范围:存在于本机的日志名称,如 "System","Application","Security" 等(受权限限制)
  • LookbackHours

    • 含义:向前回溯的时间窗口(小时数),基于指定时区计算
    • 默认值:24
    • 取值范围:1~720
  • KeywordsPattern

    • 含义:用于匹配事件消息的正则表达式(不区分大小写)
    • 默认值:"(?i)(error|failed)"
    • 取值范围:合法的 .NET 正则表达式
  • ReportDir

    • 含义:CSV 和运行日志的输出目录
    • 默认值:"C:\Ops\reports"
    • 取值范围:有效的本地目录路径(若不存在会自动创建)
  • TimeZone

    • 含义:计算时间窗口与显示事件时间所用的时区
    • 默认值:"UTC+8"
    • 取值范围:
      • Windows 时区ID(如 "China Standard Time", "Tokyo Standard Time")
      • 或 "UTC±HH[:mm]" 格式(如 "UTC+8", "UTC+09:30")
      • 或 "Local" 表示使用本机时区
  • MaxMessageLength

    • 含义:导出到 CSV 时消息字段的最大长度(超出将截断)
    • 默认值:4096
    • 取值范围:64~100000(建议适度控制,避免CSV过大)
  • 退出码

    • 0:全部成功
    • 2:部分日志读取失败(已跳过并继续)
    • 其他非零:异常终止

使用示例

  1. 按需求默认执行(System、Application,最近24h,UTC+8,关键字 error|failed,导出到 C:\Ops\reports)
  • 命令:
    • powershell.exe -ExecutionPolicy Bypass -File .\WinEventLogAnalyzer.ps1
  1. 自定义关键字与时间窗口(过去12小时,匹配 timeout 或 denied),改存储目录
  • 命令:
    • powershell.exe -File .\WinEventLogAnalyzer.ps1 -LookbackHours 12 -KeywordsPattern '(?i)(timeout|denied)' -ReportDir 'D:\Ops\reports'
  1. 指定其他时区(使用 Windows 时区ID)
  • 命令:
    • powershell.exe -File .\WinEventLogAnalyzer.ps1 -TimeZone 'China Standard Time' -LogNames 'Application','System','Setup'

注意事项

  • 执行前准备

    • 使用具备读取事件日志权限的账号(建议加入本地 Event Log Readers 组或以管理员运行)。
    • 确保目标目录存在或可创建(脚本会自动创建)。
  • 关键点与防范

    • 时间窗口严格基于指定时区计算,然后转换为本地时间对事件进行高效过滤,避免跨时区误差。
    • 消息内容匹配使用正则表达式,复杂正则可能影响性能;建议按需优化关键字。
    • 访问某些日志(如 Security)可能受限,脚本会跳过并在日志中告警,不会中断整体执行。
  • 性能与资源

    • 已使用 -FilterHashtable 限定日志与时间窗口,减少扫描范围。
    • 访问事件 Message 字段会触发格式化,成本较高;如数据量巨大,可先缩短时间窗口、减少日志数或放宽关键字以优化性能。
    • 导出的 CSV 在事件数量大时会较大,建议适度调整 MaxMessageLength 或分批执行。
  • 合规与安全

    • 脚本不写入或修改任何系统配置,不包含删除或权限提升操作。
    • 未在脚本中硬编码任何敏感信息;请勿将敏感内容写入报告目录的共享位置。

脚本概述

  • 功能描述:
    该脚本用于对指定目录进行定期备份,支持压缩(tar.gz)、加密(AES-256, OpenSSL)、校验(SHA256),并按照快照标签标识备份版本。脚本在备份前进行磁盘剩余空间预检(>20%),自动进行保留策略(7天)管理(安全迁移为过期区而非直接删除),提供按标签进行恢复到安全的“还原暂存目录”以便人工核验后再上线。支持生成或安装每日 02:00 的计划任务(cron)。

  • 适用环境:

    • 操作系统:Unix/Linux(bash、cron、tar、gzip、openssl、sha256sum)
    • 依赖命令:tar、gzip、openssl、sha256sum、df、find、awk、sed、date、mkdir、ls、crontab(可选)
  • 执行权限:

    • 备份与校验:需要对源目录的读权限、对备份目录的写权限
    • 加密:需要可读密钥文件(建议密钥文件权限为 400/600)
    • 安装计划任务(可选):安装到系统范围需 root,安装到当前用户的 crontab 不必 root

完整代码

#!/usr/bin/env bash
# safe_backup.sh
# 安全备份与按标签恢复脚本(tar.gz + AES-256 + SHA256)
# - 预检磁盘剩余 > 20%
# - 备份文件 SHA256 校验
# - 备份加密(AES-256-CBC, PBKDF2, salt)
# - 保留策略:超期备份迁移到 .expired/ 目录(不直接删除)
# - 按快照标签恢复到安全暂存目录(不覆盖生产数据)
# - 可输出/安装每日 02:00 的 cron 任务

set -euo pipefail

# ========= 默认配置(可通过环境变量或命令行覆盖) =========
SOURCE_DIR="${SOURCE_DIR:-/opt/data}"            # 备份路径(源目录)
BACKUP_ROOT="${BACKUP_ROOT:-/var/backups}"       # 备份根目录(目标根)
RETENTION_DAYS="${RETENTION_DAYS:-7}"            # 保留天数
CHECKSUM_ALGO="${CHECKSUM_ALGO:-sha256}"         # 校验算法(固定为sha256)
ENCRYPTION_ALGO="${ENCRYPTION_ALGO:-aes-256-cbc}"# 加密算法
KEY_PATH="${KEY_PATH:-/etc/secure/key}"          # 密钥路径(pass file)
PRECHECK_MIN_FREE_PERCENT="${PRECHECK_MIN_FREE_PERCENT:-20}"  # 磁盘剩余阈值(%)
LOG_FILE="${LOG_FILE:-/var/log/safe_backup.log}" # 日志文件

# 压缩格式固定 tar.gz
COMPRESSION_EXT="tar.gz"
# 备份文件主名前缀
BACKUP_PREFIX="backup"

# ========= 日志与工具 =========
umask 077

log() {
  # 级别 INFO/WARN/ERROR;同时输出到stdout和日志文件
  local level="$1"; shift
  local ts
  ts="$(date '+%Y-%m-%d %H:%M:%S')"
  echo "[$ts] [$level] $*" | tee -a "$LOG_FILE"
}

require_cmd() {
  local cmd="$1"
  if ! command -v "$cmd" >/dev/null 2>&1; then
    log ERROR "命令未找到: $cmd"
    exit 1
  fi
}

# ========= 预检 =========
precheck() {
  # 检查依赖与环境
  for c in tar gzip openssl sha256sum df find awk sed date mkdir ls; do
    require_cmd "$c"
  done

  if [ ! -d "$SOURCE_DIR" ]; then
    log ERROR "源目录不存在: $SOURCE_DIR"
    exit 1
  fi

  mkdir -p "$BACKUP_ROOT" || true

  # 备份子目录(按照源目录名归档)
  local src_base
  src_base="$(basename "$SOURCE_DIR")"
  BACKUP_DIR="${BACKUP_DIR:-$BACKUP_ROOT/$src_base}"
  EXPIRED_DIR="$BACKUP_DIR/.expired"
  RESTORE_DIR_ROOT="$BACKUP_DIR/restores"
  mkdir -p "$BACKUP_DIR" "$EXPIRED_DIR" "$RESTORE_DIR_ROOT"

  # 磁盘剩余空间检查(基于 BACKUP_ROOT 所在分区)
  local used_percent free_percent
  used_percent="$(df -P "$BACKUP_ROOT" | awk 'NR==2{gsub("%","",$5); print $5}')"
  free_percent=$((100 - used_percent))
  if [ "$free_percent" -lt "$PRECHECK_MIN_FREE_PERCENT" ]; then
    log ERROR "磁盘剩余不足: ${free_percent}% < ${PRECHECK_MIN_FREE_PERCENT}% (挂载点: $BACKUP_ROOT)"
    exit 1
  fi

  # 密钥文件检查
  if [ ! -f "$KEY_PATH" ]; then
    log ERROR "密钥文件不存在: $KEY_PATH"
    exit 1
  fi
  # 权限提示(不强制退出)
  local perm
  perm="$(stat -c '%a' "$KEY_PATH" 2>/dev/null || echo '?')"
  log INFO "密钥文件: $KEY_PATH (权限: $perm)"

  # 加锁避免并发
  LOCK_FILE="$BACKUP_DIR/.backup.lock"
  exec 9>"$LOCK_FILE"
  if ! flock -n 9; then
    log ERROR "已有备份进程在运行(锁文件: $LOCK_FILE)"
    exit 1
  fi
}

# ========= 保留策略(安全迁移) =========
expire_old_backups() {
  # 将超期备份迁移到 .expired/<timestamp>/ 目录中,避免直接删除
  local ts exp_dir
  ts="$(date '+%Y%m%d_%H%M%S')"
  exp_dir="$EXPIRED_DIR/$ts"
  mkdir -p "$exp_dir"

  log INFO "开始执行保留策略:迁移超过 ${RETENTION_DAYS} 天的备份到 $exp_dir"
  # 只处理符合命名规范的备份文件
  while IFS= read -r -d '' f; do
    local base meta sum
    base="$(basename "$f")"
    meta="${f%.*}.meta.json"
    sum="$f.$CHECKSUM_ALGO"
    for item in "$f" "$meta" "$sum"; do
      if [ -f "$item" ]; then
        log INFO "迁移过期文件: $(basename "$item")"
        mv "$item" "$exp_dir/"
      fi
    done
  done < <(find "$BACKUP_DIR" -maxdepth 1 -type f -name "${BACKUP_PREFIX}-*.${COMPRESSION_EXT}.enc" -mtime +"$RETENTION_DAYS" -print0)

  log INFO "保留策略完成:请人工确认 $exp_dir 后再统一清理。"
}

# ========= 生成文件名 =========
sanitize_label() {
  echo "$1" | sed 's/[^A-Za-z0-9_-]/_/g'
}

build_backup_paths() {
  local label="$1"
  local ts
  ts="$(date '+%Y%m%d_%H%M%S')"
  local suffix=""
  if [ -n "$label" ]; then
    suffix="-$label"
  fi
  BACKUP_NAME="${BACKUP_PREFIX}-${ts}${suffix}.${COMPRESSION_EXT}.enc"
  BACKUP_PATH="$BACKUP_DIR/$BACKUP_NAME"
  META_PATH="${BACKUP_PATH%.*}.meta.json"
  CHECKSUM_PATH="$BACKUP_PATH.$CHECKSUM_ALGO"
}

# ========= 备份 =========
do_backup() {
  local raw_label="${1:-}" label
  label="$(sanitize_label "$raw_label")"
  precheck
  build_backup_paths "$label"

  log INFO "开始备份:源=$SOURCE_DIR 目标=$BACKUP_PATH 标签=${label:-无}"
  # tar.gz -> openssl 加密到文件
  # 注意:-pass file:$KEY_PATH 不在命令行暴露明文口令
  tar -C "$SOURCE_DIR" -czf - . \
    | openssl enc -"${ENCRYPTION_ALGO}" -salt -pbkdf2 -pass "file:$KEY_PATH" -out "$BACKUP_PATH"

  log INFO "生成校验($CHECKSUM_ALGO):$CHECKSUM_PATH"
  sha256sum "$BACKUP_PATH" > "$CHECKSUM_PATH"

  log INFO "写入元数据:$META_PATH"
  cat > "$META_PATH" <<EOF
{
  "source_dir": "$(printf '%s' "$SOURCE_DIR")",
  "backup_dir": "$(printf '%s' "$BACKUP_DIR")",
  "backup_file": "$(printf '%s' "$BACKUP_NAME")",
  "timestamp": "$(date '+%Y-%m-%d %H:%M:%S %Z')",
  "label": "$(printf '%s' "${label}")",
  "compression": "tar.gz",
  "encryption": "AES-256-CBC + PBKDF2 + salt (OpenSSL)",
  "checksum_algo": "sha256",
  "checksum_file": "$(basename "$CHECKSUM_PATH")"
}
EOF

  log INFO "备份完成:$BACKUP_PATH"
  expire_old_backups
}

# ========= 校验 =========
verify_backup_file() {
  local file="$1"
  local sum_file="${file}.$CHECKSUM_ALGO"
  if [ ! -f "$sum_file" ]; then
    log ERROR "校验文件不存在:$sum_file"
    exit 1
  fi
  log INFO "校验备份:$file"
  (cd "$(dirname "$file")" && sha256sum -c "$(basename "$sum_file")")
  log INFO "校验通过:$file"
}

# ========= 查找最新指定标签的备份 =========
find_latest_by_label() {
  local label="$1"
  local pattern
  # 例如 backup-*-daily.tar.gz.enc
  pattern="${BACKUP_PREFIX}-*-${label}.${COMPRESSION_EXT}.enc"
  local latest
  latest="$(ls -1t "$BACKUP_DIR"/$pattern 2>/dev/null | head -n 1 || true)"
  if [ -z "$latest" ]; then
    log ERROR "未找到指定标签的备份:$label"
    exit 1
  fi
  echo "$latest"
}

# ========= 恢复(到安全暂存目录) =========
do_restore() {
  local raw_label="${1:-}"
  local target_dir_opt="${2:-}"  # 可选,指定还原目标目录
  local label
  label="$(sanitize_label "$raw_label")"
  precheck

  local backup_file
  backup_file="$(find_latest_by_label "$label")"
  verify_backup_file "$backup_file"

  local ts
  ts="$(date '+%Y%m%d_%H%M%S')"

  local restore_target
  if [ -n "$target_dir_opt" ]; then
    restore_target="$target_dir_opt"
  else
    restore_target="$RESTORE_DIR_ROOT/${ts}_${label}"
  fi
  mkdir -p "$restore_target"

  log INFO "开始恢复到暂存目录:$restore_target (不会覆盖生产目录)"
  openssl enc -"${ENCRYPTION_ALGO}" -d -pbkdf2 -pass "file:$KEY_PATH" -in "$backup_file" \
    | tar -xzf - -C "$restore_target"

  log INFO "恢复完成:$restore_target"
  log INFO "请核验后再进行人工替换或同步至生产目录(建议使用无删除策略的同步方式)。"
}

# ========= 计划任务(cron) =========
print_cron_line() {
  # 每日 02:00 执行备份,标签为 daily
  local script_path="${SCRIPT_PATH:-$(realpath "$0")}"
  echo "0 2 * * * ${script_path} backup --label daily >> ${LOG_FILE} 2>&1"
}

install_user_cron() {
  local line
  line="$(print_cron_line)"
  log INFO "将如下行安装到当前用户的 crontab:"
  echo "$line"
  (crontab -l 2>/dev/null; echo "$line") | crontab -
  log INFO "安装完成(当前用户)。"
}

# ========= 使用帮助 =========
usage() {
  cat <<EOF
用法:$0 <subcommand> [选项]
子命令:
  backup [--label LABEL]           执行备份(tar.gz + AES-256 + sha256),并执行保留策略
  restore --label LABEL [--target DIR]
                                   按标签恢复到安全暂存目录或指定目录(不覆盖生产)
  schedule --print                 输出每日 02:00 的 cron 行
  schedule --install               安装到当前用户的 crontab,每日 02:00 执行
  verify --file /path/to/backup.tar.gz.enc
                                   对指定备份文件进行 sha256 校验
环境变量(或配置):
  SOURCE_DIR                源目录(默认 /opt/data)
  BACKUP_ROOT               备份根(默认 /var/backups)
  RETENTION_DAYS            保留天数(默认 7)
  KEY_PATH                  密钥路径(默认 /etc/secure/key)
  PRECHECK_MIN_FREE_PERCENT 磁盘剩余阈值(默认 20)
  LOG_FILE                  日志文件(默认 /var/log/safe_backup.log)
EOF
}

# ========= 参数解析 =========
main() {
  if [ $# -lt 1 ]; then
    usage; exit 1
  fi
  local sub="$1"; shift
  case "$sub" in
    backup)
      local label=""
      while [ $# -gt 0 ]; do
        case "$1" in
          --label) label="${2:-}"; shift 2 ;;
          *) log ERROR "未知选项:$1"; usage; exit 1 ;;
        esac
      done
      do_backup "$label"
      ;;
    restore)
      local label="" target=""
      while [ $# -gt 0 ]; do
        case "$1" in
          --label) label="${2:-}"; shift 2 ;;
          --target) target="${2:-}"; shift 2 ;;
          *) log ERROR "未知选项:$1"; usage; exit 1 ;;
        esac
      done
      if [ -z "$label" ]; then
        log ERROR "restore 需要 --label"; exit 1
      fi
      do_restore "$label" "$target"
      ;;
    verify)
      local file=""
      while [ $# -gt 0 ]; do
        case "$1" in
          --file) file="${2:-}"; shift 2 ;;
          *) log ERROR "未知选项:$1"; usage; exit 1 ;;
        esac
      done
      if [ -z "$file" ]; then
        log ERROR "verify 需要 --file"; exit 1
      fi
      # 为 verify 使用 BACKUP_DIR 推导结构
      precheck
      verify_backup_file "$file"
      ;;
    schedule)
      local action=""
      while [ $# -gt 0 ]; do
        case "$1" in
          --print) action="print"; shift ;;
          --install) action="install"; shift ;;
          *) log ERROR "未知选项:$1"; usage; exit 1 ;;
        esac
      done
      case "$action" in
        print) print_cron_line ;;
        install) install_user_cron ;;
        *) usage; exit 1 ;;
      esac
      ;;
    *)
      log ERROR "未知子命令:$sub"
      usage; exit 1
      ;;
  esac
}

main "$@"

参数说明

  • SOURCE_DIR

    • 含义:需要备份的源目录
    • 默认值:/opt/data
    • 取值范围:任意存在且可读的目录路径
  • BACKUP_ROOT

    • 含义:备份文件的根存放路径,实际备份会存到 BACKUP_ROOT/$(basename SOURCE_DIR)
    • 默认值:/var/backups
    • 取值范围:任意可写目录路径
  • RETENTION_DAYS

    • 含义:备份保留天数;超过则迁移到 .expired 子目录
    • 默认值:7
    • 取值范围:正整数
  • CHECKSUM_ALGO

    • 含义:校验算法(实现为 sha256)
    • 默认值:sha256
    • 取值范围:固定为 sha256(脚本中写死,变量仅用于文件后缀)
  • ENCRYPTION_ALGO

    • 含义:OpenSSL 加密算法
    • 默认值:aes-256-cbc
    • 取值范围:建议 aes-256-cbc(已启用 PBKDF2 与 salt)
  • KEY_PATH

    • 含义:OpenSSL -pass file 使用的密钥文件路径
    • 默认值:/etc/secure/key
    • 取值范围:存在且可读的文件(建议权限 400/600)
  • PRECHECK_MIN_FREE_PERCENT

    • 含义:备份目标所在分区的最低剩余百分比阈值
    • 默认值:20
    • 取值范围:0-100 的整数
  • LOG_FILE

    • 含义:日志输出文件
    • 默认值:/var/log/safe_backup.log
    • 取值范围:可写路径
  • 命令行参数

    • backup --label LABEL:备份并将 LABEL 写入文件名与元数据(可选)
    • restore --label LABEL [--target DIR]:按标签查找最新备份并恢复到 DIR(未指定则自动生成暂存目录)
    • verify --file FILE:校验指定备份文件的 SHA256
    • schedule --print/--install:输出或安装每日 02:00 的 cron 行(当前用户)

使用示例

  1. 立即执行一次带标签的备份(标签:daily)
  • 命令:
    SOURCE_DIR=/opt/data BACKUP_ROOT=/var/backups KEY_PATH=/etc/secure/key ./safe_backup.sh backup --label daily
  1. 按标签恢复到指定暂存路径(不覆盖生产)
  • 命令:
    ./safe_backup.sh restore --label daily --target /tmp/restore_daily_check
  1. 生成并安装每日 02:00 的计划任务到当前用户
  • 命令:
    ./safe_backup.sh schedule --print
    ./safe_backup.sh schedule --install

注意事项

  • 执行前的准备工作

    • 确认依赖命令已安装:tar、gzip、openssl、sha256sum、cron(如需计划任务)
    • 确认密钥文件存在且权限合理(建议 400/600),避免被非授权用户读取
    • 确保 BACKUP_ROOT 和日志文件路径可写
    • 根据实际需要调整环境变量(SOURCE_DIR、BACKUP_ROOT、RETENTION_DAYS 等)
  • 可能的风险和防范措施

    • 数据覆盖与删除风险:脚本的恢复操作仅恢复到暂存目录,不直接覆盖生产目录;保留策略对超期文件采用迁移到 .expired 而非删除,避免误删。对生产替换请人工审核后执行,并建议使用不带删除选项的同步策略。
    • 密钥泄露风险:脚本使用 -pass file 读取密钥,不在命令行暴露明文;请对密钥文件进行访问控制与审计。
    • 磁盘空间风险:备份前进行剩余空间预检(>20%),仍建议定期检查 .expired 目录并统一清理归档。
    • 并发风险:脚本使用文件锁避免并发备份导致资源争用。
  • 性能影响和资源消耗说明

    • 压缩与加密会占用 CPU;大目录备份时建议在业务低峰时段执行(本脚本默认每日 02:00)
    • 备份写入与校验会产生显著 I/O;建议将 BACKUP_ROOT 放置在容量与吞吐较好的存储上
    • 如需进一步优化,可考虑分片备份、增量备份或并行压缩工具(例如 pigz),但需在企业规范允许范围内评估后再引入

如需根据企业规范调整(例如恢复上线自动化、归档清理策略),可在现有框架中增加审批/交互确认步骤与安全审计。

示例详情

适用用户

企业运维工程师

利用提示词快速生成监控、日志采集、备份恢复与性能优化脚本;按环境自动选择语言;输出注释与使用指南,提升交付速度并降低误操作。

DevOps工程师

在发布流水线中一键生成健康检查、滚动备份与资源巡检脚本;结构化日志与错误处理,便于持续集成与问题追踪,缩短故障修复时间。

中小企业系统管理员

不必深度编程,即可建立定时备份、磁盘空间告警、服务自检与日志打包脚本;标准化配置参数,快速覆盖Linux与Windows主机。

解决的问题

让运维团队在几分钟内从“需求描述”直达“可执行脚本”,覆盖常见场景(如监控告警、日志排障、数据备份与回滚、资源调优、安全巡检),以更少的人力和更低风险完成重复与关键操作。通过标准化、可配置、带注释的脚本生成,显著缩短交付周期,降低误操作概率,提升跨系统环境(Linux/Windows)的稳定性与合规性,最终实现效率提升与成本优化,促进试用转为付费。

特征总结

根据服务器类型与任务场景,轻松生成可直接执行的运维脚本,减少手写成本与上线等待。
自动应用安全规范与最佳实践,内置风险防护与权限提示,降低误操作和合规隐患。
支持系统监控、日志分析、备份恢复与性能优化,一键切换场景模板快速落地。
自适应Linux与Windows环境,智能选择脚本语言与命令集,避免兼容性踩坑。
自动注释与使用说明齐备,开箱即跑,新人也能快速理解参数与执行步骤。
结构化错误处理与日志记录内置,问题可追踪可复盘,减少线上排查时间。
可按需定制参数与模块,批量生成标准化脚本,便于团队复用与版本管理。
生成前进行需求梳理与实现路径评估,提前提示风险与资源占用,稳妥上线。
提供多场景演示命令与范例,复制即用,缩短从方案到生产的准备时间。
全流程校验与性能优化建议,让脚本更稳更快,提升关键运维任务完成率。

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

AI 提示词价格
¥20.00元
先用后买,用好了再付款,超安全!

您购买后可以获得什么

获得完整提示词模板
- 共 698 tokens
- 3 个可调节参数
{ 服务器类型 } { 任务类型 } { 配置参数 }
获得社区贡献内容的使用权
- 精选社区优质案例,助您快速上手提示词
限时免费

不要错过!

免费获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59