热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
本提示词专为IT运维人员设计,能够根据用户输入的服务器类型、运维任务类型和具体配置参数,自动生成专业、安全、高效的运维脚本。通过深度分析用户需求,结合最佳实践和安全性考量,提供可立即执行的脚本代码,显著提升运维效率,降低人为错误风险,确保系统稳定运行。支持多种主流操作系统和常见运维场景,包括系统监控、日志分析、备份恢复、性能优化等任务。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Linux系统监控脚本(基于 node_exporter)
- 指标:CPU、内存、磁盘、进程Top
- 告警:CPU/内存阈值触发邮件通知
- 日志:RotatingFileHandler 至指定目录
- 安全:不包含破坏性操作,不硬编码敏感信息
作者:原创
"""
import argparse
import os
import sys
import time
import ssl
import smtplib
import socket
import signal
import subprocess
import re
import threading
from email.mime.text import MIMEText
from email.utils import formatdate, make_msgid
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
from logging.handlers import RotatingFileHandler
import logging
STOP_EVENT = threading.Event()
DEFAULT_EXCLUDE_FSTYPE = [
"tmpfs", "devtmpfs", "overlay", "squashfs", "aufs", "ramfs", "iso9660",
"proc", "sysfs", "cgroup", "cgroup2", "pstore", "debugfs", "bpf", "nsfs"
]
DEFAULT_EXCLUDE_MOUNTPOINT_RE = r'^/(proc|sys|dev|run)($|/)'
def setup_logging(log_dir: str, level: str = "INFO") -> logging.Logger:
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "ops_monitor.log")
logger = logging.getLogger("ops_monitor")
logger.setLevel(getattr(logging, level.upper(), logging.INFO))
fmt = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
handler = RotatingFileHandler(log_file, maxBytes=10 * 1024 * 1024, backupCount=5, encoding="utf-8")
handler.setFormatter(fmt)
logger.addHandler(handler)
# 同时输出到控制台(可选)
console = logging.StreamHandler(sys.stdout)
console.setFormatter(fmt)
console.setLevel(getattr(logging, level.upper(), logging.INFO))
logger.addHandler(console)
return logger
def graceful_exit(signum, frame):
STOP_EVENT.set()
def http_get_with_retries(url: str, retries: int, timeout: int, logger: logging.Logger) -> str:
last_err = None
for attempt in range(1, retries + 1):
try:
req = Request(url, headers={"Accept": "text/plain"})
with urlopen(req, timeout=timeout) as resp:
charset = resp.headers.get_content_charset() or "utf-8"
data = resp.read().decode(charset, errors="replace")
return data
except (HTTPError, URLError, TimeoutError) as e:
last_err = e
logger.warning("HTTP GET failed (attempt %d/%d): %s", attempt, retries, str(e))
time.sleep(min(2 * attempt, 10))
raise RuntimeError(f"Failed to fetch metrics from {url}: {last_err}")
def parse_prometheus_text(text: str):
"""
解析 Prometheus 文本指标为 dict[name] = list of (labels_dict, value_float)
仅处理简单的 name{labels} value 或 name value 格式。
"""
metrics = {}
for line in text.splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
# name{l1="v1",l2="v2"} value 或 name value
m = re.match(r'^([a-zA-Z_:][a-zA-Z0-9_:]*)(\{[^}]*\})?\s+(-?\d+\.?\d*(?:e[+-]?\d+)?|NaN|Inf|-Inf)\s*$', line)
if not m:
# 跳过无法解析的行
continue
name, label_str, value_str = m.groups()
labels = {}
if label_str:
# 去掉花括号
label_content = label_str[1:-1]
# 简单解析 k="v",允许有逗号分隔
# 注意:不处理复杂转义场景,仅处理常见格式
parts = re.findall(r'([a-zA-Z_][a-zA-Z0-9_]*)="((?:\\.|[^"\\])*)"', label_content)
for k, v in parts:
labels[k] = bytes(v, "utf-8").decode("unicode_escape")
try:
if value_str in ("NaN", "Inf", "-Inf"):
value = float("nan") if value_str == "NaN" else float(value_str)
else:
value = float(value_str)
except Exception:
continue
metrics.setdefault(name, []).append((labels, value))
return metrics
def compute_cpu_usage(prev_metrics: dict, cur_metrics: dict, logger: logging.Logger) -> float:
"""
依据 node_cpu_seconds_total 两次快照计算总CPU使用率(百分比)
使用:usage = 1 - (delta_idle / delta_total)
"""
def sum_metric(mdict, mode_filter=None):
total = 0.0
for labels, val in mdict.get("node_cpu_seconds_total", []):
if "cpu" in labels and labels.get("cpu") == "cpu":
# 某些系统可能有聚合CPU标签 'cpu="cpu"', 忽略,以各核心求和
continue
if mode_filter is None or labels.get("mode") == mode_filter:
total += val
return total
idle_prev = sum_metric(prev_metrics, mode_filter="idle")
idle_cur = sum_metric(cur_metrics, mode_filter="idle")
total_prev = sum_metric(prev_metrics, mode_filter=None)
total_cur = sum_metric(cur_metrics, mode_filter=None)
delta_idle = idle_cur - idle_prev
delta_total = total_cur - total_prev
if delta_total <= 0:
logger.debug("CPU delta_total <= 0, skip calculation")
return float("nan")
usage = max(0.0, min(100.0, (1.0 - (delta_idle / delta_total)) * 100.0))
return usage
def compute_memory_usage(cur_metrics: dict) -> float:
"""
使用:1 - MemAvailable / MemTotal
"""
avail = None
total = None
for labels, val in cur_metrics.get("node_memory_MemAvailable_bytes", []):
avail = val
break
for labels, val in cur_metrics.get("node_memory_MemTotal_bytes", []):
total = val
break
if avail is None or total is None or total <= 0:
return float("nan")
usage = max(0.0, min(100.0, (1.0 - (avail / total)) * 100.0))
return usage
def compute_disk_usages(cur_metrics: dict, exclude_fstypes, exclude_mount_re: str):
"""
返回每个挂载点的使用率字典:{mountpoint: (usage_percent, device, fstype)}
使用:1 - avail / size
"""
size_entries = cur_metrics.get("node_filesystem_size_bytes", [])
avail_entries = cur_metrics.get("node_filesystem_avail_bytes", [])
# 构建查找表 (device, fstype, mountpoint) -> value
def key(lbl):
return (lbl.get("device", ""), lbl.get("fstype", ""), lbl.get("mountpoint", ""))
size_map = {key(lbl): val for lbl, val in size_entries}
avail_map = {key(lbl): val for lbl, val in avail_entries}
result = {}
mp_ex_re = re.compile(exclude_mount_re) if exclude_mount_re else None
for k, size in size_map.items():
device, fstype, mount = k
if not mount or not fstype:
continue
if fstype in exclude_fstypes:
continue
if mp_ex_re and mp_ex_re.search(mount):
continue
avail = avail_map.get(k)
if avail is None or size <= 0:
continue
usage = max(0.0, min(100.0, (1.0 - (avail / size)) * 100.0))
result[mount] = (usage, device, fstype)
return result
def get_top_processes(n: int, logger: logging.Logger):
"""
通过 'ps' 获取前N个CPU占用最高的进程及其内存占用
返回列表:[{'pid':..., 'comm':..., 'cpu':..., 'mem':...}, ...]
注:此项非 node_exporter 指标,为辅助定位问题。
"""
try:
# -eo: 指定列,--sort=-%cpu 按CPU降序
cmd = ["ps", "-eo", "pid,comm,%cpu,%mem", "--sort=-%cpu"]
out = subprocess.check_output(cmd, text=True)
lines = out.strip().splitlines()
# 跳过标题
data = []
for ln in lines[1:n+1]:
parts = ln.split(None, 3)
if len(parts) < 4:
continue
pid, comm, cpu, mem = parts
data.append({
"pid": int(pid),
"comm": comm,
"cpu": float(cpu),
"mem": float(mem),
})
return data
except Exception as e:
logger.warning("Failed to get top processes: %s", e)
return []
def send_mail(
smtp_host: str,
smtp_port: int,
use_tls: bool,
mail_from: str,
mail_to: list,
subject: str,
body: str,
retries: int,
smtp_user: str = None,
smtp_pass: str = None,
logger: logging.Logger = None
):
if not mail_to:
if logger:
logger.warning("No mail recipients specified; skip sending.")
return
msg = MIMEText(body, _charset="utf-8")
msg["Subject"] = subject
msg["From"] = mail_from
msg["To"] = ", ".join(mail_to)
msg["Date"] = formatdate(localtime=True)
msg["Message-Id"] = make_msgid()
last_err = None
for attempt in range(1, retries + 1):
try:
if use_tls:
context = ssl.create_default_context()
with smtplib.SMTP(smtp_host, smtp_port, timeout=10) as server:
server.ehlo()
server.starttls(context=context)
server.ehlo()
if smtp_user and smtp_pass:
server.login(smtp_user, smtp_pass)
server.sendmail(mail_from, mail_to, msg.as_string())
else:
with smtplib.SMTP(smtp_host, smtp_port, timeout=10) as server:
server.ehlo()
if smtp_user and smtp_pass:
server.login(smtp_user, smtp_pass)
server.sendmail(mail_from, mail_to, msg.as_string())
if logger:
logger.info("Alert mail sent: %s -> %s", subject, mail_to)
return
except Exception as e:
last_err = e
if logger:
logger.warning("Send mail failed (attempt %d/%d): %s", attempt, retries, e)
time.sleep(min(2 * attempt, 10))
# 发送失败记录错误
if logger:
logger.error("Failed to send alert mail after %d attempts: %s", retries, last_err)
def build_alert_body(hostname: str, cpu_usage: float, mem_usage: float, disks_alert: dict, top_procs: list) -> str:
lines = []
lines.append(f"Host: {hostname}")
lines.append(f"CPU Usage: {cpu_usage:.2f}%")
lines.append(f"Memory Usage: {mem_usage:.2f}%")
if disks_alert:
lines.append("Disk Partitions Exceeded Threshold:")
for mp, (usage, device, fstype) in disks_alert.items():
lines.append(f" - {mp} ({device}, {fstype}): {usage:.2f}%")
if top_procs:
lines.append("Top Processes by CPU:")
for p in top_procs:
lines.append(f" PID {p['pid']} {p['comm']} - CPU {p['cpu']:.2f}% MEM {p['mem']:.2f}%")
lines.append("")
lines.append("This is an automated message.")
return "\n".join(lines)
def run_monitor(args):
logger = setup_logging(args.log_dir, args.log_level)
hostname = socket.gethostname()
logger.info("Starting ops_monitor on host: %s", hostname)
logger.info("Node exporter endpoint: %s", args.node_exporter_url)
logger.info("Monitoring interval: %ds | thresholds: CPU>%s%%, MEM>%s%%, DISK>%s%%",
args.interval, args.cpu_threshold, args.mem_threshold, args.disk_threshold)
# 初次抓取,作为 CPU 计算的前快照
try:
prev_text = http_get_with_retries(args.node_exporter_url, args.retry, args.timeout, logger)
prev_metrics = parse_prometheus_text(prev_text)
prev_time = time.time()
except Exception as e:
logger.error("Initial metrics fetch failed: %s", e)
# 不终止,等待下一轮
prev_metrics = None
prev_time = None
# 主循环
while not STOP_EVENT.is_set():
start_ts = time.time()
# 睡眠到下次周期
remaining = args.interval - (start_ts - (prev_time or start_ts))
sleep_sec = max(1.0, remaining)
STOP_EVENT.wait(sleep_sec)
if STOP_EVENT.is_set():
break
# 抓取当前指标
try:
cur_text = http_get_with_retries(args.node_exporter_url, args.retry, args.timeout, logger)
cur_metrics = parse_prometheus_text(cur_text)
cur_time = time.time()
except Exception as e:
logger.error("Metrics fetch failed: %s", e)
# 记录失败告警但不中止
if args.fail_alert_email:
send_mail(
args.smtp_host, args.smtp_port, args.smtp_tls,
args.mail_from, args.mail_to,
subject=f"[OPS ALERT] Metrics fetch failed on {hostname}",
body=f"Failed to fetch metrics from {args.node_exporter_url}: {e}",
retries=args.retry,
smtp_user=args.smtp_user, smtp_pass=args.smtp_pass,
logger=logger
)
continue
# 计算 CPU(需前后快照)
cpu_usage = float("nan")
if prev_metrics:
cpu_usage = compute_cpu_usage(prev_metrics, cur_metrics, logger)
else:
logger.debug("No previous CPU snapshot; will compute from next iteration.")
# 计算内存
mem_usage = compute_memory_usage(cur_metrics)
# 计算磁盘
disk_usages = compute_disk_usages(
cur_metrics,
exclude_fstypes=args.exclude_fstype,
exclude_mount_re=args.exclude_mount_re
)
disks_exceeded = {mp: info for mp, info in disk_usages.items() if info[0] >= args.disk_threshold}
# 获取Top进程(仅记录,不触发阈值告警)
top_procs = get_top_processes(args.process_top, logger)
# 日志记录
cpu_str = "N/A" if cpu_usage != cpu_usage else f"{cpu_usage:.2f}%"
mem_str = "N/A" if mem_usage != mem_usage else f"{mem_usage:.2f}%"
logger.info("CPU=%s MEM=%s", cpu_str, mem_str)
for mp, (usage, device, fstype) in disk_usages.items():
logger.info("DISK %s (%s,%s)=%.2f%%", mp, device, fstype, usage)
if top_procs:
for p in top_procs:
logger.info("TOP PID=%d COMM=%s CPU=%.2f%% MEM=%.2f%%",
p["pid"], p["comm"], p["cpu"], p["mem"])
# 告警判断(CPU/内存阈值,磁盘阈值独立)
should_alert = False
alert_reasons = []
if cpu_usage == cpu_usage and cpu_usage >= args.cpu_threshold:
should_alert = True
alert_reasons.append(f"CPU>{args.cpu_threshold}% (current {cpu_usage:.2f}%)")
if mem_usage == mem_usage and mem_usage >= args.mem_threshold:
should_alert = True
alert_reasons.append(f"MEM>{args.mem_threshold}% (current {mem_usage:.2f}%)")
if disks_exceeded:
should_alert = True
for mp, (usage, _, _) in disks_exceeded.items():
alert_reasons.append(f"DISK {mp}>{args.disk_threshold}% (current {usage:.2f}%)")
if should_alert and args.mail_to:
subject = f"[OPS ALERT] {hostname}: " + "; ".join(alert_reasons)
body = build_alert_body(hostname, cpu_usage, mem_usage, disks_exceeded, top_procs)
send_mail(
args.smtp_host, args.smtp_port, args.smtp_tls,
args.mail_from, args.mail_to, subject, body,
retries=args.retry,
smtp_user=args.smtp_user, smtp_pass=args.smtp_pass,
logger=logger
)
# 更新前快照用于下一轮CPU计算
prev_metrics = cur_metrics
prev_time = cur_time
def parse_args():
parser = argparse.ArgumentParser(description="Linux 系统监控脚本(基于 node_exporter)")
parser.add_argument("--node-exporter-url", default="http://127.0.0.1:9100/metrics",
help="node_exporter /metrics 地址")
parser.add_argument("--interval", type=int, default=60, help="监控间隔(秒)")
parser.add_argument("--cpu-threshold", type=float, default=80.0, help="CPU使用率阈值(百分比)")
parser.add_argument("--mem-threshold", type=float, default=75.0, help="内存使用率阈值(百分比)")
parser.add_argument("--disk-threshold", type=float, default=85.0, help="磁盘分区使用率阈值(百分比)")
parser.add_argument("--process-top", type=int, default=5, help="记录前N个CPU最高的进程数量")
parser.add_argument("--smtp-host", default=os.environ.get("OPS_SMTP_HOST", "smtp.example.com"),
help="SMTP服务器地址(默认读取环境变量 OPS_SMTP_HOST)")
parser.add_argument("--smtp-port", type=int, default=int(os.environ.get("OPS_SMTP_PORT", "587")),
help="SMTP服务器端口(默认读取环境变量 OPS_SMTP_PORT 或 587)")
parser.add_argument("--smtp-tls", action="store_true", default=True,
help="使用STARTTLS(默认启用)")
parser.add_argument("--smtp-user", default=os.environ.get("OPS_SMTP_USER"),
help="SMTP用户名(可使用环境变量 OPS_SMTP_USER)")
parser.add_argument("--smtp-pass", default=os.environ.get("OPS_SMTP_PASS"),
help="SMTP密码(可使用环境变量 OPS_SMTP_PASS)")
parser.add_argument("--mail-from", default=None,
help="告警发件人(默认 ops-monitor@<hostname>)")
parser.add_argument("--mail-to", nargs="*", default=[],
help="告警收件人列表(空则不发送邮件)")
parser.add_argument("--retry", type=int, default=3, help="自动重试次数(网络/邮件)")
parser.add_argument("--timeout", type=int, default=10, help="HTTP请求超时(秒)")
parser.add_argument("--log-dir", default="/var/log/ops", help="日志目录")
parser.add_argument("--log-level", default="INFO", help="日志级别(DEBUG/INFO/WARN/ERROR)")
parser.add_argument("--exclude-fstype", nargs="*", default=DEFAULT_EXCLUDE_FSTYPE,
help="磁盘过滤的文件系统类型列表(不统计)")
parser.add_argument("--exclude-mount-re", dest="exclude_mount_re", default=DEFAULT_EXCLUDE_MOUNTPOINT_RE,
help="磁盘过滤的挂载点正则(不统计)")
parser.add_argument("--fail-alert-email", action="store_true", default=False,
help="抓取失败时发送故障告警邮件")
args = parser.parse_args()
if not args.mail_from:
args.mail_from = f"ops-monitor@{socket.gethostname()}"
return args
def main():
# 信号处理(优雅退出)
signal.signal(signal.SIGINT, graceful_exit)
signal.signal(signal.SIGTERM, graceful_exit)
args = parse_args()
try:
run_monitor(args)
except Exception as e:
# 顶层兜底日志
logger = setup_logging(args.log_dir, args.log_level)
logger.exception("Monitor terminated with error: %s", e)
sys.exit(1)
if __name__ == "__main__":
main()
python3 ops_monitor.py --mail-to ops-alerts@example.com
python3 ops_monitor.py \
--node-exporter-url http://127.0.0.1:9100/metrics \
--log-dir /var/log/ops \
--fail-alert-email \
--mail-to ops-alerts@example.com ops-backup@example.com
export OPS_SMTP_HOST=smtp.example.com
export OPS_SMTP_PORT=587
export OPS_SMTP_USER=monitor_user
export OPS_SMTP_PASS='your_app_password'
python3 ops_monitor.py \
--cpu-threshold 85 --mem-threshold 80 --disk-threshold 90 \
--mail-to ops-alerts@example.com
<#
.SYNOPSIS
Windows 事件日志关键字分析与按事件ID汇总导出(CSV)
.DESCRIPTION
- 从指定日志(默认 System, Application)提取最近 N 小时事件,时间窗口基于指定时区(默认 UTC+8)
- 过滤消息中包含给定关键字正则(默认 (?i)(error|failed))
- 导出详细事件 CSV 与按事件ID的汇总 CSV
- 输出运行日志到报告目录
.NOTES
兼容 PowerShell 5.1+,不使用已弃用的 Get-EventLog;采用 Get-WinEvent。
#>
[CmdletBinding()]
param(
[Parameter(Mandatory=$false)]
[string[]]$LogNames = @('System','Application'),
[Parameter(Mandatory=$false)]
[ValidateRange(1,720)]
[int]$LookbackHours = 24,
[Parameter(Mandatory=$false)]
[string]$KeywordsPattern = '(?i)(error|failed)',
[Parameter(Mandatory=$false)]
[ValidateNotNullOrEmpty()]
[string]$ReportDir = 'C:\Ops\reports',
[Parameter(Mandatory=$false)]
[ValidateNotNullOrEmpty()]
[string]$TimeZone = 'UTC+8', # 支持 Windows 时区ID(如 "China Standard Time")或 "UTC±HH[:mm]"
[Parameter(Mandatory=$false)]
[ValidateRange(64,100000)]
[int]$MaxMessageLength = 4096 # 导出时对消息截断长度,避免CSV过大
)
Set-StrictMode -Version Latest
$ErrorActionPreference = 'Stop'
#-------------------- 工具函数 --------------------#
function Write-Log {
param(
[Parameter(Mandatory=$true)][string]$Message,
[ValidateSet("INFO","WARN","ERROR")][string]$Level = "INFO"
)
$ts = (Get-Date).ToString("yyyy-MM-dd HH:mm:ss.fff")
$line = "[{0}] [{1}] {2}" -f $ts, $Level, $Message
Write-Host $line
if ($script:LogFile) { Add-Content -Path $script:LogFile -Value $line -Encoding UTF8 }
}
function Get-TimeZoneInfoSafe {
param([Parameter(Mandatory=$true)][string]$TzSpec)
# 接受 "Local"
if ($TzSpec -match '^(?i)local$') {
return [System.TimeZoneInfo]::Local
}
# 先尝试 Windows 时区ID
try {
return [System.TimeZoneInfo]::FindSystemTimeZoneById($TzSpec)
} catch {
# 解析 "UTC±HH[:mm]" 格式
if ($TzSpec -match '^\s*UTC\s*([+-])\s*(\d{1,2})(?::?(\d{2}))?\s*$') {
$sign = $matches[1]
$h = [int]$matches[2]
$m = if ($matches[3]) { [int]$matches[3] } else { 0 }
if ($h -gt 14 -or $m -ge 60) {
throw "无效的 UTC 偏移:$TzSpec"
}
$offset = New-Object System.TimeSpan($h, $m, 0)
if ($sign -eq '-') {
$offset = -$offset
}
$id = "UTC{0}{1:00}:{2:00}" -f $sign, [math]::Abs($offset.Hours), [math]::Abs($offset.Minutes)
return [System.TimeZoneInfo]::CreateCustomTimeZone($id, $offset, $id, $id)
}
throw "无法识别的时区:'$TzSpec'。请使用 Windows 时区ID(如 'China Standard Time')或 'UTC±HH[:mm]'。"
}
}
function Get-TimeWindowInLocal {
param(
[Parameter(Mandatory=$true)][System.TimeZoneInfo]$TargetTz,
[Parameter(Mandatory=$true)][int]$Hours
)
# 以目标时区的“当前时刻”为基准,计算 [now - Hours, now],再转换到本地时区用于高效过滤
$nowUtc = [DateTime]::UtcNow
$nowTz = [System.TimeZoneInfo]::ConvertTimeFromUtc($nowUtc, $TargetTz)
$startTz = $nowTz.AddHours(-$Hours)
$endTz = $nowTz
$startLocal = [System.TimeZoneInfo]::ConvertTime($startTz, $TargetTz, [System.TimeZoneInfo]::Local)
$endLocal = [System.TimeZoneInfo]::ConvertTime($endTz, $TargetTz, [System.TimeZoneInfo]::Local)
return [PSCustomObject]@{
StartLocal = $startLocal
EndLocal = $endLocal
NowTz = $nowTz
StartTz = $startTz
EndTz = $endTz
}
}
function Truncate-String {
param(
[AllowNull()][string]$Text,
[int]$Max = 4096
)
if ([string]::IsNullOrEmpty($Text)) { return $Text }
if ($Text.Length -le $Max) { return $Text }
return $Text.Substring(0, $Max)
}
#-------------------- 初始化 --------------------#
$script:ExitCode = 0
$created = $false
try {
if (-not (Test-Path -LiteralPath $ReportDir)) {
New-Item -ItemType Directory -Force -Path $ReportDir | Out-Null
$created = $true
}
} catch {
Write-Host "无法创建报告目录:$ReportDir"
throw
}
$stamp = (Get-Date -Format 'yyyyMMdd_HHmmss')
$DetailCsv = Join-Path $ReportDir ("event_details_{0}.csv" -f $stamp)
$SummaryCsv = Join-Path $ReportDir ("event_summary_byId_{0}.csv" -f $stamp)
$script:LogFile = Join-Path $ReportDir ("run_{0}.log" -f $stamp)
Write-Log "报告目录:$ReportDir(新建:$created)"
Write-Log "详情CSV:$DetailCsv"
Write-Log "汇总CSV:$SummaryCsv"
# 编译正则
try {
$Regex = [System.Text.RegularExpressions.Regex]::new($KeywordsPattern, [System.Text.RegularExpressions.RegexOptions]::IgnoreCase)
Write-Log "关键字正则:$KeywordsPattern"
} catch {
Write-Log "关键字正则无效:$KeywordsPattern" "ERROR"
throw
}
# 时区与时间窗口
$tz = Get-TimeZoneInfoSafe -TzSpec $TimeZone
$tw = Get-TimeWindowInLocal -TargetTz $tz -Hours $LookbackHours
Write-Log ("时区:{0} | 窗口({1}h):{2} ~ {3}(按时区)" -f $tz.Id, $LookbackHours, $tw.StartTz.ToString('yyyy-MM-dd HH:mm:ss zzz'), $tw.EndTz.ToString('yyyy-MM-dd HH:mm:ss zzz'))
#-------------------- 采集与过滤 --------------------#
$allEvents = New-Object System.Collections.Generic.List[System.Diagnostics.Eventing.Reader.EventRecord]
$failedLogs = @()
foreach ($ln in $LogNames) {
try {
Write-Log "读取日志:$ln(本地时间窗:$($tw.StartLocal) ~ $($tw.EndLocal))"
$part = Get-WinEvent -FilterHashtable @{ LogName = $ln; StartTime = $tw.StartLocal; EndTime = $tw.EndLocal } -ErrorAction Stop
foreach ($e in $part) { [void]$allEvents.Add($e) }
Write-Log ("读取完成:{0} 条" -f ($part | Measure-Object | Select-Object -ExpandProperty Count))
} catch {
Write-Log ("读取日志失败:{0} | {1}" -f $ln, $_.Exception.Message) "WARN"
$failedLogs += $ln
$script:ExitCode = 2
continue
}
}
Write-Log ("合计原始事件:{0}" -f $allEvents.Count)
# 消息过滤(关键字)
$filtered = @()
foreach ($rec in $allEvents) {
try {
$msg = $rec.Message
if ([string]::IsNullOrEmpty($msg)) { continue }
if ($Regex.IsMatch($msg)) { $filtered += $rec }
} catch {
# 某些记录在格式化消息时可能异常,忽略该条
Write-Log ("消息读取异常,已跳过:Log={0}, Id={1}, Time={2}" -f $rec.LogName, $rec.Id, $rec.TimeCreated) "WARN"
continue
}
}
Write-Log ("关键字命中事件:{0}" -f $filtered.Count)
#-------------------- 导出详情 --------------------#
$detailRows = $filtered | ForEach-Object {
$evtLocalTime = $_.TimeCreated
$evtTzTime = [System.TimeZoneInfo]::ConvertTime($evtLocalTime, [System.TimeZoneInfo]::Local, $tz)
[PSCustomObject]@{
EventTime = $evtTzTime.ToString('yyyy-MM-dd HH:mm:ss zzz')
TimeZone = $tz.Id
LogName = $_.LogName
EventId = $_.Id
Level = $_.LevelDisplayName
Provider = $_.ProviderName
Machine = $_.MachineName
Keywords = ($_.KeywordsDisplayNames -join ';')
Message = Truncate-String -Text $_.Message -Max $MaxMessageLength
}
}
$detailRows | Export-Csv -Path $DetailCsv -NoTypeInformation -Encoding UTF8
Write-Log ("详情已导出:{0}" -f $DetailCsv)
#-------------------- 导出汇总(按事件ID) --------------------#
$summaryRows =
$filtered |
Group-Object -Property Id |
Sort-Object -Property Count -Descending |
ForEach-Object {
$group = $_.Group
$firstLocal = $group | Sort-Object TimeCreated | Select-Object -First 1 -ExpandProperty TimeCreated
$lastLocal = $group | Sort-Object TimeCreated -Descending | Select-Object -First 1 -ExpandProperty TimeCreated
$firstTz = [System.TimeZoneInfo]::ConvertTime($firstLocal, [System.TimeZoneInfo]::Local, $tz)
$lastTz = [System.TimeZoneInfo]::ConvertTime($lastLocal, [System.TimeZoneInfo]::Local, $tz)
[PSCustomObject]@{
EventId = $_.Name
Count = $_.Count
LogNames = ($group | Select-Object -ExpandProperty LogName -Unique | Sort-Object | Out-String).Trim() -replace '\r?\n', '; '
FirstSeen = $firstTz.ToString('yyyy-MM-dd HH:mm:ss zzz')
LastSeen = $lastTz.ToString('yyyy-MM-dd HH:mm:ss zzz')
Providers = ($group | Select-Object -ExpandProperty ProviderName -Unique | Sort-Object | Out-String).Trim() -replace '\r?\n', '; '
Levels = ($group | Select-Object -ExpandProperty LevelDisplayName -Unique | Sort-Object | Out-String).Trim() -replace '\r?\n', '; '
}
}
$summaryRows | Export-Csv -Path $SummaryCsv -NoTypeInformation -Encoding UTF8
Write-Log ("汇总已导出:{0}" -f $SummaryCsv)
if ($failedLogs.Count -gt 0) {
Write-Log ("注意:以下日志读取失败且已跳过 -> {0}" -f ($failedLogs -join ', ')) "WARN"
}
Write-Log "完成"
exit $script:ExitCode
LogNames
LookbackHours
KeywordsPattern
ReportDir
TimeZone
MaxMessageLength
退出码
执行前准备
关键点与防范
性能与资源
合规与安全
功能描述:
该脚本用于对指定目录进行定期备份,支持压缩(tar.gz)、加密(AES-256, OpenSSL)、校验(SHA256),并按照快照标签标识备份版本。脚本在备份前进行磁盘剩余空间预检(>20%),自动进行保留策略(7天)管理(安全迁移为过期区而非直接删除),提供按标签进行恢复到安全的“还原暂存目录”以便人工核验后再上线。支持生成或安装每日 02:00 的计划任务(cron)。
适用环境:
执行权限:
#!/usr/bin/env bash
# safe_backup.sh
# 安全备份与按标签恢复脚本(tar.gz + AES-256 + SHA256)
# - 预检磁盘剩余 > 20%
# - 备份文件 SHA256 校验
# - 备份加密(AES-256-CBC, PBKDF2, salt)
# - 保留策略:超期备份迁移到 .expired/ 目录(不直接删除)
# - 按快照标签恢复到安全暂存目录(不覆盖生产数据)
# - 可输出/安装每日 02:00 的 cron 任务
set -euo pipefail
# ========= 默认配置(可通过环境变量或命令行覆盖) =========
SOURCE_DIR="${SOURCE_DIR:-/opt/data}" # 备份路径(源目录)
BACKUP_ROOT="${BACKUP_ROOT:-/var/backups}" # 备份根目录(目标根)
RETENTION_DAYS="${RETENTION_DAYS:-7}" # 保留天数
CHECKSUM_ALGO="${CHECKSUM_ALGO:-sha256}" # 校验算法(固定为sha256)
ENCRYPTION_ALGO="${ENCRYPTION_ALGO:-aes-256-cbc}"# 加密算法
KEY_PATH="${KEY_PATH:-/etc/secure/key}" # 密钥路径(pass file)
PRECHECK_MIN_FREE_PERCENT="${PRECHECK_MIN_FREE_PERCENT:-20}" # 磁盘剩余阈值(%)
LOG_FILE="${LOG_FILE:-/var/log/safe_backup.log}" # 日志文件
# 压缩格式固定 tar.gz
COMPRESSION_EXT="tar.gz"
# 备份文件主名前缀
BACKUP_PREFIX="backup"
# ========= 日志与工具 =========
umask 077
log() {
# 级别 INFO/WARN/ERROR;同时输出到stdout和日志文件
local level="$1"; shift
local ts
ts="$(date '+%Y-%m-%d %H:%M:%S')"
echo "[$ts] [$level] $*" | tee -a "$LOG_FILE"
}
require_cmd() {
local cmd="$1"
if ! command -v "$cmd" >/dev/null 2>&1; then
log ERROR "命令未找到: $cmd"
exit 1
fi
}
# ========= 预检 =========
precheck() {
# 检查依赖与环境
for c in tar gzip openssl sha256sum df find awk sed date mkdir ls; do
require_cmd "$c"
done
if [ ! -d "$SOURCE_DIR" ]; then
log ERROR "源目录不存在: $SOURCE_DIR"
exit 1
fi
mkdir -p "$BACKUP_ROOT" || true
# 备份子目录(按照源目录名归档)
local src_base
src_base="$(basename "$SOURCE_DIR")"
BACKUP_DIR="${BACKUP_DIR:-$BACKUP_ROOT/$src_base}"
EXPIRED_DIR="$BACKUP_DIR/.expired"
RESTORE_DIR_ROOT="$BACKUP_DIR/restores"
mkdir -p "$BACKUP_DIR" "$EXPIRED_DIR" "$RESTORE_DIR_ROOT"
# 磁盘剩余空间检查(基于 BACKUP_ROOT 所在分区)
local used_percent free_percent
used_percent="$(df -P "$BACKUP_ROOT" | awk 'NR==2{gsub("%","",$5); print $5}')"
free_percent=$((100 - used_percent))
if [ "$free_percent" -lt "$PRECHECK_MIN_FREE_PERCENT" ]; then
log ERROR "磁盘剩余不足: ${free_percent}% < ${PRECHECK_MIN_FREE_PERCENT}% (挂载点: $BACKUP_ROOT)"
exit 1
fi
# 密钥文件检查
if [ ! -f "$KEY_PATH" ]; then
log ERROR "密钥文件不存在: $KEY_PATH"
exit 1
fi
# 权限提示(不强制退出)
local perm
perm="$(stat -c '%a' "$KEY_PATH" 2>/dev/null || echo '?')"
log INFO "密钥文件: $KEY_PATH (权限: $perm)"
# 加锁避免并发
LOCK_FILE="$BACKUP_DIR/.backup.lock"
exec 9>"$LOCK_FILE"
if ! flock -n 9; then
log ERROR "已有备份进程在运行(锁文件: $LOCK_FILE)"
exit 1
fi
}
# ========= 保留策略(安全迁移) =========
expire_old_backups() {
# 将超期备份迁移到 .expired/<timestamp>/ 目录中,避免直接删除
local ts exp_dir
ts="$(date '+%Y%m%d_%H%M%S')"
exp_dir="$EXPIRED_DIR/$ts"
mkdir -p "$exp_dir"
log INFO "开始执行保留策略:迁移超过 ${RETENTION_DAYS} 天的备份到 $exp_dir"
# 只处理符合命名规范的备份文件
while IFS= read -r -d '' f; do
local base meta sum
base="$(basename "$f")"
meta="${f%.*}.meta.json"
sum="$f.$CHECKSUM_ALGO"
for item in "$f" "$meta" "$sum"; do
if [ -f "$item" ]; then
log INFO "迁移过期文件: $(basename "$item")"
mv "$item" "$exp_dir/"
fi
done
done < <(find "$BACKUP_DIR" -maxdepth 1 -type f -name "${BACKUP_PREFIX}-*.${COMPRESSION_EXT}.enc" -mtime +"$RETENTION_DAYS" -print0)
log INFO "保留策略完成:请人工确认 $exp_dir 后再统一清理。"
}
# ========= 生成文件名 =========
sanitize_label() {
echo "$1" | sed 's/[^A-Za-z0-9_-]/_/g'
}
build_backup_paths() {
local label="$1"
local ts
ts="$(date '+%Y%m%d_%H%M%S')"
local suffix=""
if [ -n "$label" ]; then
suffix="-$label"
fi
BACKUP_NAME="${BACKUP_PREFIX}-${ts}${suffix}.${COMPRESSION_EXT}.enc"
BACKUP_PATH="$BACKUP_DIR/$BACKUP_NAME"
META_PATH="${BACKUP_PATH%.*}.meta.json"
CHECKSUM_PATH="$BACKUP_PATH.$CHECKSUM_ALGO"
}
# ========= 备份 =========
do_backup() {
local raw_label="${1:-}" label
label="$(sanitize_label "$raw_label")"
precheck
build_backup_paths "$label"
log INFO "开始备份:源=$SOURCE_DIR 目标=$BACKUP_PATH 标签=${label:-无}"
# tar.gz -> openssl 加密到文件
# 注意:-pass file:$KEY_PATH 不在命令行暴露明文口令
tar -C "$SOURCE_DIR" -czf - . \
| openssl enc -"${ENCRYPTION_ALGO}" -salt -pbkdf2 -pass "file:$KEY_PATH" -out "$BACKUP_PATH"
log INFO "生成校验($CHECKSUM_ALGO):$CHECKSUM_PATH"
sha256sum "$BACKUP_PATH" > "$CHECKSUM_PATH"
log INFO "写入元数据:$META_PATH"
cat > "$META_PATH" <<EOF
{
"source_dir": "$(printf '%s' "$SOURCE_DIR")",
"backup_dir": "$(printf '%s' "$BACKUP_DIR")",
"backup_file": "$(printf '%s' "$BACKUP_NAME")",
"timestamp": "$(date '+%Y-%m-%d %H:%M:%S %Z')",
"label": "$(printf '%s' "${label}")",
"compression": "tar.gz",
"encryption": "AES-256-CBC + PBKDF2 + salt (OpenSSL)",
"checksum_algo": "sha256",
"checksum_file": "$(basename "$CHECKSUM_PATH")"
}
EOF
log INFO "备份完成:$BACKUP_PATH"
expire_old_backups
}
# ========= 校验 =========
verify_backup_file() {
local file="$1"
local sum_file="${file}.$CHECKSUM_ALGO"
if [ ! -f "$sum_file" ]; then
log ERROR "校验文件不存在:$sum_file"
exit 1
fi
log INFO "校验备份:$file"
(cd "$(dirname "$file")" && sha256sum -c "$(basename "$sum_file")")
log INFO "校验通过:$file"
}
# ========= 查找最新指定标签的备份 =========
find_latest_by_label() {
local label="$1"
local pattern
# 例如 backup-*-daily.tar.gz.enc
pattern="${BACKUP_PREFIX}-*-${label}.${COMPRESSION_EXT}.enc"
local latest
latest="$(ls -1t "$BACKUP_DIR"/$pattern 2>/dev/null | head -n 1 || true)"
if [ -z "$latest" ]; then
log ERROR "未找到指定标签的备份:$label"
exit 1
fi
echo "$latest"
}
# ========= 恢复(到安全暂存目录) =========
do_restore() {
local raw_label="${1:-}"
local target_dir_opt="${2:-}" # 可选,指定还原目标目录
local label
label="$(sanitize_label "$raw_label")"
precheck
local backup_file
backup_file="$(find_latest_by_label "$label")"
verify_backup_file "$backup_file"
local ts
ts="$(date '+%Y%m%d_%H%M%S')"
local restore_target
if [ -n "$target_dir_opt" ]; then
restore_target="$target_dir_opt"
else
restore_target="$RESTORE_DIR_ROOT/${ts}_${label}"
fi
mkdir -p "$restore_target"
log INFO "开始恢复到暂存目录:$restore_target (不会覆盖生产目录)"
openssl enc -"${ENCRYPTION_ALGO}" -d -pbkdf2 -pass "file:$KEY_PATH" -in "$backup_file" \
| tar -xzf - -C "$restore_target"
log INFO "恢复完成:$restore_target"
log INFO "请核验后再进行人工替换或同步至生产目录(建议使用无删除策略的同步方式)。"
}
# ========= 计划任务(cron) =========
print_cron_line() {
# 每日 02:00 执行备份,标签为 daily
local script_path="${SCRIPT_PATH:-$(realpath "$0")}"
echo "0 2 * * * ${script_path} backup --label daily >> ${LOG_FILE} 2>&1"
}
install_user_cron() {
local line
line="$(print_cron_line)"
log INFO "将如下行安装到当前用户的 crontab:"
echo "$line"
(crontab -l 2>/dev/null; echo "$line") | crontab -
log INFO "安装完成(当前用户)。"
}
# ========= 使用帮助 =========
usage() {
cat <<EOF
用法:$0 <subcommand> [选项]
子命令:
backup [--label LABEL] 执行备份(tar.gz + AES-256 + sha256),并执行保留策略
restore --label LABEL [--target DIR]
按标签恢复到安全暂存目录或指定目录(不覆盖生产)
schedule --print 输出每日 02:00 的 cron 行
schedule --install 安装到当前用户的 crontab,每日 02:00 执行
verify --file /path/to/backup.tar.gz.enc
对指定备份文件进行 sha256 校验
环境变量(或配置):
SOURCE_DIR 源目录(默认 /opt/data)
BACKUP_ROOT 备份根(默认 /var/backups)
RETENTION_DAYS 保留天数(默认 7)
KEY_PATH 密钥路径(默认 /etc/secure/key)
PRECHECK_MIN_FREE_PERCENT 磁盘剩余阈值(默认 20)
LOG_FILE 日志文件(默认 /var/log/safe_backup.log)
EOF
}
# ========= 参数解析 =========
main() {
if [ $# -lt 1 ]; then
usage; exit 1
fi
local sub="$1"; shift
case "$sub" in
backup)
local label=""
while [ $# -gt 0 ]; do
case "$1" in
--label) label="${2:-}"; shift 2 ;;
*) log ERROR "未知选项:$1"; usage; exit 1 ;;
esac
done
do_backup "$label"
;;
restore)
local label="" target=""
while [ $# -gt 0 ]; do
case "$1" in
--label) label="${2:-}"; shift 2 ;;
--target) target="${2:-}"; shift 2 ;;
*) log ERROR "未知选项:$1"; usage; exit 1 ;;
esac
done
if [ -z "$label" ]; then
log ERROR "restore 需要 --label"; exit 1
fi
do_restore "$label" "$target"
;;
verify)
local file=""
while [ $# -gt 0 ]; do
case "$1" in
--file) file="${2:-}"; shift 2 ;;
*) log ERROR "未知选项:$1"; usage; exit 1 ;;
esac
done
if [ -z "$file" ]; then
log ERROR "verify 需要 --file"; exit 1
fi
# 为 verify 使用 BACKUP_DIR 推导结构
precheck
verify_backup_file "$file"
;;
schedule)
local action=""
while [ $# -gt 0 ]; do
case "$1" in
--print) action="print"; shift ;;
--install) action="install"; shift ;;
*) log ERROR "未知选项:$1"; usage; exit 1 ;;
esac
done
case "$action" in
print) print_cron_line ;;
install) install_user_cron ;;
*) usage; exit 1 ;;
esac
;;
*)
log ERROR "未知子命令:$sub"
usage; exit 1
;;
esac
}
main "$@"
SOURCE_DIR
BACKUP_ROOT
RETENTION_DAYS
CHECKSUM_ALGO
ENCRYPTION_ALGO
KEY_PATH
PRECHECK_MIN_FREE_PERCENT
LOG_FILE
命令行参数
执行前的准备工作
可能的风险和防范措施
性能影响和资源消耗说明
如需根据企业规范调整(例如恢复上线自动化、归档清理策略),可在现有框架中增加审批/交互确认步骤与安全审计。
利用提示词快速生成监控、日志采集、备份恢复与性能优化脚本;按环境自动选择语言;输出注释与使用指南,提升交付速度并降低误操作。
在发布流水线中一键生成健康检查、滚动备份与资源巡检脚本;结构化日志与错误处理,便于持续集成与问题追踪,缩短故障修复时间。
不必深度编程,即可建立定时备份、磁盘空间告警、服务自检与日志打包脚本;标准化配置参数,快速覆盖Linux与Windows主机。
让运维团队在几分钟内从“需求描述”直达“可执行脚本”,覆盖常见场景(如监控告警、日志排障、数据备份与回滚、资源调优、安全巡检),以更少的人力和更低风险完成重复与关键操作。通过标准化、可配置、带注释的脚本生成,显著缩短交付周期,降低误操作概率,提升跨系统环境(Linux/Windows)的稳定性与合规性,最终实现效率提升与成本优化,促进试用转为付费。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
免费获取高级提示词-优惠即将到期