热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
帮助开发者创建Python脚本自动更新网站XML网站地图,支持变更检测与部署,确保代码规范与性能优化。
以下方案与示例代码专为 Python + Django 网站、Nginx 托管环境设计,满足“智能变更识别、网站地图生成与根目录部署”的要求,并兼顾效率与扩展性。
整体流程:
核心模块与函数:
server {
server_name example.com;
root /var/www/example.com/html;
# sitemap.xml / sitemap_index.xml / gz 直接静态提供
location = /sitemap.xml { try_files /sitemap.xml /sitemap.xml.gz =404; }
location = /sitemap_index.xml { try_files /sitemap_index.xml /sitemap_index.xml.gz =404; }
# 可选:开启 gzip 静态(依据系统环境)
gzip on;
gzip_types application/xml text/xml;
}
Sitemap: https://www.example.com/sitemap.xml
目录结构建议(示例):
# settings.py
SITEMAP_AUTO = {
"BASE_URL": "https://www.example.com",
"OUTPUT_DIR": "/var/www/example.com/html", # Nginx web root
"OUTPUT_NAME": "sitemap.xml", # 主文件名
"MAX_URLS_PER_FILE": 50000,
"GZIP": True,
"PING_SEARCH_ENGINES": True,
"STATE_FILE": "/var/www/example.com/.sitemap_state.json",
"CHANGE_COOLDOWN_SECONDS": 60, # 合并更新的冷却时间窗口
# 统一声明要进 sitemap 的来源
"SOURCES": [
# 静态路由(Django URL name)
{"type": "static", "names": ["home", "about", "contact"]},
# 模型来源示例:页面
{
"type": "model",
"model": "pages.Page", # app_label.ModelName
"url_name": "page-detail", # 用于 reverse 的 URL name
"lookup": "slug", # URL 参数字段
"lastmod_field": "updated_at",
"filter": {"is_published": True},
"content_fields": ["title", "body", "slug", "is_published"]
},
# 模型来源示例:博文
{
"type": "model",
"model": "blog.Post",
"url_name": "blog-detail",
"lookup": "slug",
"lastmod_field": "updated_at",
"filter": {"status": "published"},
"content_fields": ["title", "content", "slug", "status"]
},
],
}
# your_app/sitemap_auto/change_detection.py
import json, hashlib, time
from pathlib import Path
from typing import Dict, Optional
def canonicalize_text(text: str) -> str:
# 简化版:去多余空白与常见 HTML 标签(可定制为更高级的 HTML 处理)
import re
text = re.sub(r"<[^>]+>", " ", text or "")
text = re.sub(r"\s+", " ", text).strip()
return text
def compute_content_hash(fields: Dict[str, str]) -> str:
canon = []
for k in sorted(fields.keys()):
v = fields[k]
canon.append(f"{k}:{canonicalize_text(str(v))}")
joined = "|".join(canon)
return hashlib.sha256(joined.encode("utf-8")).hexdigest()
class StateStore:
def __init__(self, path: str):
self.path = Path(path)
self.data = {"urls": {}, "last_write_ts": 0}
if self.path.exists():
try:
self.data = json.loads(self.path.read_text("utf-8"))
except Exception:
# 读失败则重置
self.data = {"urls": {}, "last_write_ts": 0}
def get_url_state(self, url: str) -> Optional[Dict]:
return self.data["urls"].get(url)
def set_url_state(self, url: str, content_hash: str, lastmod_iso: str):
self.data["urls"][url] = {"hash": content_hash, "lastmod": lastmod_iso}
def remove_missing(self, current_urls: set):
prev = set(self.data["urls"].keys())
for missing in prev - current_urls:
self.data["urls"].pop(missing, None)
def save(self):
tmp = self.path.with_suffix(".tmp")
tmp.write_text(json.dumps(self.data, ensure_ascii=False), "utf-8")
tmp.replace(self.path)
def mark_written(self):
self.data["last_write_ts"] = int(time.time())
# your_app/sitemap_auto/generator.py
import os, gzip, tempfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterable, List, Dict, Tuple, Optional
from django.conf import settings
from django.urls import reverse
from django.utils.module_loading import import_string
from django.db.models import Model
from .change_detection import StateStore, compute_content_hash
XML_HEADER = '<?xml version="1.0" encoding="UTF-8"?>\n'
NS_URLSET = 'http://www.sitemaps.org/schemas/sitemap/0.9'
def isoformat(dt: datetime) -> str:
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.isoformat(timespec="seconds")
class UrlCollector:
def __init__(self, cfg: Dict):
self.cfg = cfg
self.base_url = cfg["BASE_URL"].rstrip("/")
def collect(self) -> List[Dict]:
"""
统一输出结构:
{
"loc": "https://www.example.com/xxx/",
"lastmod": "2025-11-26T12:00:00+00:00",
"changefreq": "daily", # 可选
"priority": 0.5, # 可选
"hash_fields": { ... } # 用于哈希检测的字段值
}
"""
urls = []
for src in self.cfg["SOURCES"]:
if src["type"] == "static":
for name in src["names"]:
path = reverse(name)
urls.append({
"loc": f"{self.base_url}{path}",
"lastmod": isoformat(datetime.now(timezone.utc)),
"hash_fields": {"url_name": name}
})
elif src["type"] == "model":
model = import_string(f"{src['model']}")
qs = model.objects.all()
flt = src.get("filter") or {}
if flt:
qs = qs.filter(**flt)
lookup = src["lookup"]
lastmod_field = src.get("lastmod_field")
content_fields = src.get("content_fields", [])
# 分批读取,减少内存与锁占用
for obj in qs.iterator(chunk_size=2000):
# 构造 URL
kw = {lookup: getattr(obj, lookup)}
path = reverse(src["url_name"], kwargs=kw)
loc = f"{self.base_url}{path}"
# lastmod
if lastmod_field:
last_dt = getattr(obj, lastmod_field)
lastmod = isoformat(last_dt) if last_dt else isoformat(datetime.now(timezone.utc))
else:
lastmod = isoformat(datetime.now(timezone.utc))
# 内容字段采集用于哈希
fields = {}
for f in content_fields:
fields[f] = getattr(obj, f, None)
urls.append({
"loc": loc,
"lastmod": lastmod,
"hash_fields": fields | {"loc": loc},
})
return urls
class SitemapGenerator:
def __init__(self, cfg: Dict):
self.cfg = cfg
self.output_dir = Path(cfg["OUTPUT_DIR"])
self.output_name = cfg.get("OUTPUT_NAME", "sitemap.xml")
self.max_urls = int(cfg.get("MAX_URLS_PER_FILE", 50000))
self.gzip_enabled = bool(cfg.get("GZIP", True))
self.state_store = StateStore(cfg["STATE_FILE"])
def _build_url_xml(self, url_items: Iterable[Dict]) -> bytes:
# 直接用拼接,避免巨型树;也可改为 lxml/ElementTree
parts = [XML_HEADER, f'<urlset xmlns="{NS_URLSET}">\n']
for item in url_items:
parts.append(" <url>\n")
parts.append(f" <loc>{item['loc']}</loc>\n")
parts.append(f" <lastmod>{item['lastmod']}</lastmod>\n")
# 如需 changefreq/priority,可根据配置添加
parts.append(" </url>\n")
parts.append("</urlset>\n")
return "".join(parts).encode("utf-8")
def _write_atomic(self, path: Path, payload: bytes, gz: bool = False):
path.parent.mkdir(parents=True, exist_ok=True)
with tempfile.NamedTemporaryFile(dir=path.parent, delete=False) as tmp:
if gz:
with gzip.open(tmp, "wb", compresslevel=5) as gzf:
gzf.write(payload)
else:
tmp.write(payload)
tmp.flush()
os.fsync(tmp.fileno())
tmp_path = Path(tmp.name)
tmp_path.replace(path)
def _write_sitemap_files(self, urls: List[Dict]) -> Tuple[List[str], Optional[str]]:
"""
返回:([sitemap_part_urls], sitemap_index_url|None)
"""
base_url = self.cfg["BASE_URL"].rstrip("/")
if len(urls) <= self.max_urls:
xml = self._build_url_xml(urls)
out = self.output_dir / self.output_name
if self.gzip_enabled:
self._write_atomic(out.with_suffix(out.suffix + ".gz"), xml, gz=True)
self._write_atomic(out, xml, gz=False)
return [f"{base_url}/{self.output_name}"], None
# 分片与索引
index_entries = []
part_urls = []
chunks = [urls[i:i+self.max_urls] for i in range(0, len(urls), self.max_urls)]
for idx, chunk in enumerate(chunks, start=1):
xml = self._build_url_xml(chunk)
name = f"sitemap-{idx}.xml"
out = self.output_dir / name
if self.gzip_enabled:
gz_out = out.with_suffix(out.suffix + ".gz")
self._write_atomic(gz_out, xml, gz=True)
part_urls.append(f"{base_url}/{gz_out.name}")
self._write_atomic(out, xml, gz=False)
part_urls.append(f"{base_url}/{name}")
# 索引项(引用 .xml 或 .xml.gz,搜索引擎均可)
index_entries.append({
"loc": f"{base_url}/{name}",
"lastmod": isoformat(datetime.now(timezone.utc)),
})
# 写索引
parts = [XML_HEADER, '<sitemapindex xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">\n']
for it in index_entries:
parts.append(" <sitemap>\n")
parts.append(f" <loc>{it['loc']}</loc>\n")
parts.append(f" <lastmod>{it['lastmod']}</lastmod>\n")
parts.append(" </sitemap>\n")
parts.append("</sitemapindex>\n")
index_xml = "".join(parts).encode("utf-8")
index_out = self.output_dir / "sitemap_index.xml"
if self.gzip_enabled:
self._write_atomic(index_out.with_suffix(index_out.suffix + ".gz"), index_xml, gz=True)
self._write_atomic(index_out, index_xml, gz=False)
return part_urls, f"{base_url}/sitemap_index.xml"
def generate_and_publish(self) -> Dict:
collector = UrlCollector(self.cfg)
urls = collector.collect()
# 状态对比,判断是否需要重建
changed = False
current_set = set()
for item in urls:
loc = item["loc"]
current_set.add(loc)
content_hash = compute_content_hash(item["hash_fields"])
prev = self.state_store.get_url_state(loc)
if not prev or prev["hash"] != content_hash:
changed = True
# 更新内存状态(稍后统一保存)
self.state_store.set_url_state(loc, content_hash, item["lastmod"])
# 删除不存在的旧 URL
self.state_store.remove_missing(current_set)
# 合并/节流:冷却窗口内,如无新增/删除/实质修改,不写文件
cooldown = int(self.cfg.get("CHANGE_COOLDOWN_SECONDS", 0))
if cooldown > 0 and not changed:
# 若近期已写过(未过冷却期),且无变化,则跳过
import time
if time.time() - self.state_store.data.get("last_write_ts", 0) < cooldown:
return {"changed": False, "reason": "cooldown_no_change"}
# 有变化或过了冷却期,重建发布
part_urls, index_url = self._write_sitemap_files(urls)
self.state_store.mark_written()
self.state_store.save()
# 可选:搜索引擎 ping(建议做异常保护与速率控制)
if self.cfg.get("PING_SEARCH_ENGINES", False):
self._ping_search_engines(index_url or part_urls[0])
return {"changed": True, "count": len(urls), "index": index_url, "parts": part_urls}
def _ping_search_engines(self, sitemap_url: str):
import urllib.parse, urllib.request
endpoints = [
f"https://www.google.com/ping?sitemap={urllib.parse.quote(sitemap_url)}",
f"https://www.bing.com/ping?sitemap={urllib.parse.quote(sitemap_url)}",
]
for ep in endpoints:
try:
with urllib.request.urlopen(ep, timeout=5) as resp:
resp.read()
except Exception:
# 忽略 ping 失败,避免影响主流程
pass
# your_app/management/commands/update_sitemap.py
from django.core.management.base import BaseCommand
from django.conf import settings
from your_app.sitemap_auto.generator import SitemapGenerator
class Command(BaseCommand):
help = "Generate and deploy XML sitemap to Nginx web root."
def add_arguments(self, parser):
parser.add_argument("--force", action="store_true", help="Force regenerate regardless of change detection.")
def handle(self, *args, **options):
cfg = getattr(settings, "SITEMAP_AUTO", None)
if not cfg:
self.stderr.write(self.style.ERROR("SITEMAP_AUTO not configured in settings.py"))
return
gen = SitemapGenerator(cfg)
if options["force"]:
# 强制重建:清空状态后重建
gen.state_store.data = {"urls": {}, "last_write_ts": 0}
result = gen.generate_and_publish()
if result.get("changed"):
self.stdout.write(self.style.SUCCESS(
f"Sitemap updated. URLs: {result.get('count')}, index: {result.get('index')}"
))
else:
self.stdout.write(self.style.WARNING(f"No changes: {result.get('reason', '')}"))
# your_app/sitemap_auto/signals.py
import time
from pathlib import Path
from django.conf import settings
from django.db.models.signals import post_save, post_delete
from django.apps import apps
DIRTY_FLAG = Path(settings.SITEMAP_AUTO["OUTPUT_DIR"]) / ".sitemap_dirty.flag"
def mark_dirty(*args, **kwargs):
# 写一个标记文件,供定时任务或 celery 检测到后生成
try:
DIRTY_FLAG.write_text(str(int(time.time())), "utf-8")
except Exception:
pass
def connect_signals():
cfg = settings.SITEMAP_AUTO
for src in cfg["SOURCES"]:
if src["type"] == "model":
app_label, model_name = src["model"].split(".")
model = apps.get_model(app_label, model_name)
post_save.connect(mark_dirty, sender=model, dispatch_uid=f"sitemap_dirty_save_{src['model']}")
post_delete.connect(mark_dirty, sender=model, dispatch_uid=f"sitemap_dirty_del_{src['model']}")
在 Django AppConfig 中自动连接信号:
# your_app/apps.py
from django.apps import AppConfig
class YourAppConfig(AppConfig):
name = "your_app"
def ready(self):
try:
from .sitemap_auto.signals import connect_signals
connect_signals()
except Exception:
# 避免迁移时因 import 失败影响流程
pass
*/360 * * * * /path/to/venv/bin/python /path/to/project/manage.py update_sitemap >> /var/log/update_sitemap.log 2>&1
[Unit]
Description=Update sitemap.xml
[Service]
Type=oneshot
WorkingDirectory=/path/to/project
ExecStart=/path/to/venv/bin/python manage.py update_sitemap
[Unit]
Description=Run sitemap update every 6 hours
[Timer]
OnCalendar=*-*-* 00/6:00:00
Persistent=true
[Install]
WantedBy=timers.target
systemctl enable --now update-sitemap.timer
此方案兼顾智能变更识别、标准化 XML 构建、原子化部署与运行效率,并与 Django + Nginx 技术栈深度适配。
以下方案面向 JavaScript/React 网站,部署在 Linux VPS 环境,提供页面变更智能识别、XML 网站地图生成与根目录部署的自动化脚本与实践建议。
请在 Python 3.10+ 环境中安装以下依赖:
安装示例:
pip install playwright beautifulsoup4 lxml requests python-dotenv
python -m playwright install
整体流程:
核心函数:
为降低 SPA 环境中的误报率与漏报:
生成的 XML 文件满足标准:
# 编辑 crontab
crontab -e
# 每天 02:00 执行
0 2 * * * /usr/bin/python3 /opt/sitemap/sitemap_updater.py \
--base-url https://example.com \
--web-root /var/www/example.com/html \
--max-depth 3 >> /var/log/sitemap_updater.log 2>&1
该脚本适配 React SPA,支持 Playwright 渲染抓取、变更检测与 XML 网站地图生成与部署。你可根据站点结构微调参数与排除选择器。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import argparse
import hashlib
import logging
import os
import re
import shutil
import sqlite3
import time
from datetime import datetime, timezone
from typing import Dict, List, Set, Tuple, Optional
from urllib.parse import urlparse, urljoin, urlunparse, parse_qsl, urlencode
import requests
from bs4 import BeautifulSoup
from lxml import etree
from urllib.robotparser import RobotFileParser
from playwright.async_api import async_playwright
# ----------------------------
# 配置与常量
# ----------------------------
DEFAULT_EXCLUDE_QUERY_KEYS = {
"utm_source", "utm_medium", "utm_campaign", "utm_term", "utm_content",
"gclid", "fbclid", "msclkid", "ref", "ref_src"
}
DEFAULT_KEEP_QUERY_KEYS = {"page"} # 可根据业务保留分页等查询键
DEFAULT_EXCLUDE_SELECTORS = [
"script", "style", "noscript",
# 常见动态或噪声区域(可按需调整)
"header .timestamp", ".ad", ".ads", ".advert", ".cookie-banner",
".toast", ".notification", ".modal", ".overlay", ".live-counter"
]
DB_DIR = os.path.join(os.path.expanduser("~"), ".sitemap_cache")
DB_PATH = os.path.join(DB_DIR, "sitemap.db")
# ----------------------------
# 工具函数
# ----------------------------
def ensure_dir(path: str):
if not os.path.exists(path):
os.makedirs(path, exist_ok=True)
def isoformat_now() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def normalize_url(url: str, base_netloc: str) -> Optional[str]:
"""归一化 URL:同源过滤、去碎片、过滤追踪参数,仅保留白名单查询键。"""
try:
parsed = urlparse(url)
if not parsed.scheme:
return None
if parsed.netloc != base_netloc:
return None
# 去 fragment
fragmentless = parsed._replace(fragment="")
# 过滤查询参数
query_pairs = parse_qsl(fragmentless.query, keep_blank_values=True)
filtered = []
for k, v in query_pairs:
if k in DEFAULT_EXCLUDE_QUERY_KEYS:
continue
if k in DEFAULT_KEEP_QUERY_KEYS:
filtered.append((k, v))
new_query = urlencode(filtered, doseq=True)
normalized = fragmentless._replace(query=new_query)
return urlunparse(normalized)
except Exception:
return None
def sha256_text(text: str) -> str:
return hashlib.sha256(text.encode("utf-8", errors="ignore")).hexdigest()
def clean_html_to_text(html: str, extra_selectors: List[str]) -> str:
"""清洗 HTML,移除噪声,输出规范化文本用于指纹。"""
soup = BeautifulSoup(html, "html.parser")
# 统一移除默认 + 额外的选择器
for selector in set(DEFAULT_EXCLUDE_SELECTORS + extra_selectors):
for tag in soup.select(selector):
tag.decompose()
# 删除隐藏元素
for tag in soup.select("[hidden], [aria-hidden=true]"):
tag.decompose()
# 规范化文本
text = soup.get_text(separator=" ", strip=True)
text = re.sub(r"\s+", " ", text).strip()
return text
# ----------------------------
# 数据库持久化
# ----------------------------
def setup_db():
ensure_dir(DB_DIR)
conn = sqlite3.connect(DB_PATH)
conn.execute("""
CREATE TABLE IF NOT EXISTS pages (
url TEXT PRIMARY KEY,
content_hash TEXT,
last_seen TEXT,
last_modified TEXT
);
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_pages_last_seen ON pages(last_seen);")
conn.commit()
return conn
def load_existing(conn) -> Dict[str, Tuple[str, str, str]]:
cur = conn.execute("SELECT url, content_hash, last_seen, last_modified FROM pages;")
return {row[0]: (row[1], row[2], row[3]) for row in cur.fetchall()}
def upsert_page(conn, url: str, content_hash: str, last_modified: Optional[str] = None):
now = isoformat_now()
if last_modified is None:
last_modified = now
conn.execute("""
INSERT INTO pages(url, content_hash, last_seen, last_modified)
VALUES(?, ?, ?, ?)
ON CONFLICT(url) DO UPDATE SET
content_hash=excluded.content_hash,
last_seen=excluded.last_seen,
last_modified=CASE
WHEN pages.content_hash != excluded.content_hash THEN excluded.last_modified
ELSE pages.last_modified
END;
""", (url, content_hash, now, last_modified))
def mark_seen(conn, url: str):
now = isoformat_now()
conn.execute("UPDATE pages SET last_seen=? WHERE url=?;", (now, url))
# ----------------------------
# 机器人协议
# ----------------------------
def load_robots(base_url: str) -> RobotFileParser:
parsed = urlparse(base_url)
robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt"
rp = RobotFileParser()
try:
rp.set_url(robots_url)
rp.read()
except Exception:
# 不可用则默认允许
rp = RobotFileParser()
rp.parse([])
return rp
def allowed_by_robots(rp: RobotFileParser, url: str, ua: str = "SitemapUpdater"):
try:
return rp.can_fetch(ua, url)
except Exception:
return True
# ----------------------------
# Playwright 抓取
# ----------------------------
class Crawler:
def __init__(self, base_url: str, max_depth: int = 3, concurrency: int = 4, extra_selectors: Optional[List[str]] = None):
self.base_url = base_url.rstrip("/")
self.parsed_base = urlparse(self.base_url)
self.base_netloc = self.parsed_base.netloc
self.max_depth = max_depth
self.concurrency = concurrency
self.extra_selectors = extra_selectors or []
self.visited: Set[str] = set()
self.discovered: Set[str] = set()
self.sem = asyncio.Semaphore(concurrency)
self.rp = load_robots(self.base_url)
async def crawl(self) -> Dict[str, Dict]:
"""返回 URL -> {hash, lastmod} 的映射。"""
results: Dict[str, Dict] = {}
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()
try:
# BFS 队列 (url, depth)
queue: List[Tuple[str, int]] = [(self.base_url, 0)]
while queue:
url, depth = queue.pop(0)
if depth > self.max_depth:
continue
normalized = normalize_url(url, self.base_netloc)
if not normalized:
continue
if normalized in self.visited:
continue
if not allowed_by_robots(self.rp, normalized):
logging.debug(f"Disallowed by robots: {normalized}")
continue
# 并发控制
await self.sem.acquire()
asyncio.create_task(self._process_url(context, normalized, depth, queue, results))
# 等待所有任务完成
while self.sem._value != self.concurrency:
await asyncio.sleep(0.05)
finally:
await context.close()
await browser.close()
return results
async def _process_url(self, context, url: str, depth: int, queue: List[Tuple[str, int]], results: Dict[str, Dict]):
page = await context.new_page()
try:
logging.info(f"Crawling: {url} (depth={depth})")
response = await page.goto(url, wait_until="networkidle", timeout=30000)
# 有些 React SPA 总是 200,但我们仍继续提取
html = await page.content()
# Canonical URL
canonical = await page.evaluate("""
() => {
const link = document.querySelector('link[rel="canonical"]');
return link ? link.href : null;
}
""")
target_url = normalize_url(canonical, self.base_netloc) or url
# 内容指纹
text = clean_html_to_text(html, self.extra_selectors)
content_hash = sha256_text(text)
results[target_url] = {
"hash": content_hash,
"lastmod": isoformat_now()
}
self.visited.add(target_url)
self.discovered.add(target_url)
# 发现新链接
links = await page.evaluate("""
() => Array.from(document.querySelectorAll('a[href]'))
.map(a => a.href)
""")
for href in set(links or []):
norm = normalize_url(href, self.base_netloc)
if not norm:
continue
if norm in self.visited:
continue
# 过滤常见静态资源与登录页等(可扩展)
if re.search(r"\.(jpg|jpeg|png|gif|svg|webp|pdf|zip|rar|7z|mp4|mp3)$", norm, re.I):
continue
if re.search(r"/login|/auth|/signup", norm, re.I):
continue
queue.append((norm, depth + 1))
except Exception as e:
logging.warning(f"Failed to crawl {url}: {e}")
finally:
await page.close()
self.sem.release()
# ----------------------------
# 变更检测与网站地图
# ----------------------------
def detect_changes(conn, crawled: Dict[str, Dict]) -> Tuple[List[str], List[str], List[str]]:
"""返回 (new_urls, modified_urls, removed_urls)"""
existing = load_existing(conn)
new_urls, modified_urls = [], []
current_set = set(crawled.keys())
previous_set = set(existing.keys())
for url, info in crawled.items():
h = info["hash"]
if url not in existing:
new_urls.append(url)
upsert_page(conn, url, h, last_modified=info["lastmod"])
else:
old_hash = existing[url][0]
if old_hash != h:
modified_urls.append(url)
upsert_page(conn, url, h, last_modified=info["lastmod"])
else:
mark_seen(conn, url)
removed_urls = sorted(list(previous_set - current_set))
return new_urls, modified_urls, removed_urls
def build_sitemap_xml(base_url: str, urls: List[Tuple[str, str]], default_changefreq: str = "daily") -> bytes:
"""
urls: List of (url, lastmod)
返回 XML 字节数据
"""
NS = "http://www.sitemaps.org/schemas/sitemap/0.9"
urlset = etree.Element("urlset", xmlns=NS)
# 可按路径设置优先级
def priority_for(u: str) -> str:
if u.rstrip("/") == base_url.rstrip("/"):
return "1.0"
depth = len(urlparse(u).path.strip("/").split("/")) if urlparse(u).path.strip("/") else 1
return "0.5" if depth <= 2 else "0.3"
for u, lastmod in sorted(urls):
url_el = etree.SubElement(urlset, "url")
loc = etree.SubElement(url_el, "loc"); loc.text = u
lm = etree.SubElement(url_el, "lastmod"); lm.text = lastmod
cf = etree.SubElement(url_el, "changefreq"); cf.text = default_changefreq
pr = etree.SubElement(url_el, "priority"); pr.text = priority_for(u)
return etree.tostring(urlset, xml_declaration=True, encoding="utf-8", pretty_print=True)
def deploy_sitemap(xml_bytes: bytes, web_root: str, filename: str = "sitemap.xml"):
ensure_dir(web_root)
tmp_path = os.path.join(web_root, f".{filename}.tmp")
final_path = os.path.join(web_root, filename)
with open(tmp_path, "wb") as f:
f.write(xml_bytes)
# 原子替换
shutil.move(tmp_path, final_path)
# 权限(按需调整)
os.chmod(final_path, 0o644)
logging.info(f"Sitemap deployed: {final_path}")
# 健康检查(可选)
try:
# 通过 base_url 拼接确认
# 如果不易确定,可跳过或传入完整 URL
logging.info("Health check skipped (requires public URL).")
except Exception as e:
logging.warning(f"Health check failed: {e}")
# ----------------------------
# 主流程
# ----------------------------
async def run(base_url: str, web_root: str, max_depth: int, concurrency: int, extra_selectors: List[str]):
logging.info("Initializing database...")
conn = setup_db()
logging.info("Starting crawler...")
crawler = Crawler(base_url, max_depth=max_depth, concurrency=concurrency, extra_selectors=extra_selectors)
crawled = await crawler.crawl()
logging.info(f"Crawled URLs: {len(crawled)}")
new_urls, modified_urls, removed_urls = detect_changes(conn, crawled)
conn.commit()
logging.info(f"New: {len(new_urls)}, Modified: {len(modified_urls)}, Removed: {len(removed_urls)}")
# 构建网站地图(仅包含本次发现的所有有效 URL)
url_list = [(u, crawled[u]["lastmod"]) for u in sorted(crawled.keys())]
xml_bytes = build_sitemap_xml(base_url, url_list, default_changefreq="daily")
# 部署
deploy_sitemap(xml_bytes, web_root)
conn.close()
logging.info("Done.")
def parse_args():
ap = argparse.ArgumentParser(description="React site XML sitemap auto updater")
ap.add_argument("--base-url", required=True, help="站点入口 URL,例如 https://example.com")
ap.add_argument("--web-root", required=True, help="网站根目录,例如 /var/www/example.com/html")
ap.add_argument("--max-depth", type=int, default=3, help="爬取最大深度,默认 3")
ap.add_argument("--concurrency", type=int, default=4, help="并发页面数,默认 4")
ap.add_argument("--exclude-selectors", nargs="*", default=[], help="额外排除的 CSS 选择器")
ap.add_argument("--log-level", default="INFO", choices=["DEBUG","INFO","WARNING","ERROR"])
return ap.parse_args()
if __name__ == "__main__":
args = parse_args()
logging.basicConfig(level=getattr(logging, args.log_level), format="%(asctime)s %(levelname)s %(message)s")
try:
asyncio.run(run(
base_url=args.base_url,
web_root=args.web_root,
max_depth=args.max_depth,
concurrency=args.concurrency,
extra_selectors=args.exclude_selectors
))
except KeyboardInterrupt:
logging.warning("Interrupted by user")
python /opt/sitemap/sitemap_updater.py \
--base-url https://example.com \
--web-root /var/www/example.com/html \
--max-depth 3 \
--concurrency 6 \
--exclude-selectors ".live-ticker" ".user-panel"
与 React 构建集成:
crawler.discovered 集合,以减少对页面点击路径的依赖。robots.txt:
User-agent: *
Allow: /
Sitemap: https://example.com/sitemap.xml
如需进一步定制(例如多 sitemap 拆分、优先级策略、API 路由聚合、在 CI/CD 内触发执行),可告知具体站点结构与规模,我将提供针对性的增强脚本。
示例安装命令:
pip install gitpython python-frontmatter tomli PyYAML beautifulsoup4 lxml watchdog
# Hugo 安装请参考官方文档或使用基础镜像 klakegg/hugo
推荐在 Hugo 中添加 JSON 索引(更准确,减少复杂映射): 在 config.toml 添加:
[outputs]
home = ["HTML", "JSON"]
section = ["HTML", "JSON"]
page = ["HTML", "JSON"]
在 layouts/_default/list.json 与 layouts/_default/single.json 中输出所需字段(permalink、lastmod、alternates 等),Python 脚本读取 public/*.json 合并为索引。
示例 CronJob 清单(简化版):
apiVersion: batch/v1
kind: CronJob
metadata:
name: sitemap-updater
spec:
schedule: "*/30 * * * *" # 每 30 分钟
jobTemplate:
spec:
template:
spec:
containers:
- name: updater
image: ghcr.io/your-org/hugo-python:latest # 基于 Hugo + Python
command: ["python", "/app/sitemap_updater.py"]
env:
- name: SITE_BASE_URL
value: "https://example.com"
- name: SITE_ROOT_PATH
value: "/site-root" # Web 根目录挂载点
- name: HUGO_PUBLISH_DIR
value: "/site-root" # Hugo 输出目录与 Web 根一致
volumeMounts:
- name: site-pvc
mountPath: /site-root
# 如需从 Git 拉源,可在此添加 initContainer 执行 git clone
restartPolicy: OnFailure
volumes:
- name: site-pvc
persistentVolumeClaim:
claimName: site-content-pvc
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Hugo + Kubernetes 自动更新 XML 网站地图脚本
- 变更检测(Git + Front Matter + 内容哈希)
- 构建输出索引(优先读取自定义 JSON;回退扫描 public)
- 生成 sitemap.xml(可选分片与 gzip)
- 部署到网站根目录(publishDir 或共享卷)
"""
import os
import sys
import json
import hashlib
import subprocess
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import frontmatter
from git import Repo
from bs4 import BeautifulSoup
try:
import tomli # Python 3.10+
except ImportError:
tomli = None
try:
import yaml # PyYAML
except ImportError:
yaml = None
from xml.etree.ElementTree import Element, SubElement, ElementTree
# --------------------
# 配置与常量
# --------------------
CACHE_DIR = Path(".cache")
STATE_FILE = CACHE_DIR / "sitemap_state.json"
DEFAULT_PUBLISH_DIR = "public"
XML_NS = "http://www.sitemaps.org/schemas/sitemap/0.9"
XML_XHTML_NS = "http://www.w3.org/1999/xhtml"
MAX_URLS_PER_SITEMAP = 50000
# --------------------
# 工具函数
# --------------------
def isoformat_utc(dt: datetime) -> str:
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def sha256_bytes(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def read_file_bytes(path: Path) -> bytes:
with path.open("rb") as f:
return f.read()
def ensure_cache_dir():
CACHE_DIR.mkdir(exist_ok=True)
def load_json(path: Path) -> dict:
if not path.exists():
return {}
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def save_json(path: Path, data: dict):
with path.open("w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def log(msg: str):
print(f"[{datetime.now().isoformat(timespec='seconds')}] {msg}", flush=True)
# --------------------
# 配置加载
# --------------------
def load_hugo_config(config_path: Optional[Path] = None) -> dict:
"""
支持读取 config.toml / config.yaml
"""
cfg = {}
if config_path is None:
# 按惯例查找
candidates = [Path("config.toml"), Path("config.yaml"), Path("config.yml")]
config_path = next((p for p in candidates if p.exists()), None)
if config_path is None:
log("未找到 Hugo 配置文件,使用环境变量与默认值。")
return cfg
if config_path.suffix == ".toml":
if tomli is None:
raise RuntimeError("tomli 未安装,无法解析 TOML。请 pip install tomli。")
with config_path.open("rb") as f:
cfg = tomli.load(f)
else:
if yaml is None:
raise RuntimeError("PyYAML 未安装,无法解析 YAML。请 pip install PyYAML。")
with config_path.open("r", encoding="utf-8") as f:
cfg = yaml.safe_load(f)
return cfg or {}
def get_base_url(cfg: dict) -> str:
env = os.getenv("SITE_BASE_URL")
base = env or cfg.get("baseURL", "") or ""
base = base.strip()
if not base:
raise RuntimeError("缺少 baseURL(请在 config 或环境变量 SITE_BASE_URL 指定)。")
# 统一去除末尾斜杠
return base.rstrip("/")
def get_publish_dir(cfg: dict) -> Path:
env = os.getenv("HUGO_PUBLISH_DIR")
pub = env or cfg.get("publishDir", DEFAULT_PUBLISH_DIR)
return Path(pub)
def get_site_root_path() -> Optional[Path]:
# 若 Web 根与 publishDir 不同,可用此变量指定部署路径
p = os.getenv("SITE_ROOT_PATH")
return Path(p) if p else None
# --------------------
# Git 与 Front Matter
# --------------------
def locate_repo(start: Path = Path(".")) -> Optional[Repo]:
try:
return Repo(start, search_parent_directories=True)
except Exception:
return None
def git_last_commit_iso(repo: Repo, file_path: Path) -> Optional[str]:
try:
rel = str(file_path)
commits = list(repo.iter_commits(paths=rel, max_count=1))
if commits:
return commits[0].committed_datetime.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
except Exception:
pass
return None
def normalize_frontmatter(meta: dict) -> dict:
"""
仅保留与输出/URL相关的关键字段,降低误报
"""
keys = ["slug", "url", "aliases", "title", "date", "lastmod", "draft", "type", "layout", "permalink"]
return {k: meta.get(k) for k in keys if k in meta}
def compute_page_fingerprint(file_path: Path, repo: Optional[Repo]) -> Tuple[str, str]:
"""
返回 (fingerprint_hash, lastmod_iso)
"""
try:
post = frontmatter.load(file_path)
meta_norm = normalize_frontmatter(post.metadata or {})
content_bytes = (post.content or "").encode("utf-8")
# 结合 Front Matter 与正文
meta_bytes = json.dumps(meta_norm, ensure_ascii=False, sort_keys=True).encode("utf-8")
fingerprint = sha256_bytes(meta_bytes + b"\n" + content_bytes)
# lastmod 优先级:frontmatter.lastmod -> git commit -> frontmatter.date -> mtime
lastmod = None
lm = post.metadata.get("lastmod")
if lm:
lastmod = lm if isinstance(lm, str) else str(lm)
if not lastmod and repo:
lastmod = git_last_commit_iso(repo, file_path)
if not lastmod and post.metadata.get("date"):
lm_date = post.metadata.get("date")
lastmod = lm_date if isinstance(lm_date, str) else str(lm_date)
if not lastmod:
ts = datetime.fromtimestamp(file_path.stat().st_mtime, tz=timezone.utc)
lastmod = isoformat_utc(ts)
return fingerprint, lastmod
except Exception as e:
log(f"Front Matter 解析失败: {file_path} - {e}")
# 回退:仅用文件内容与 mtime
data = read_file_bytes(file_path)
fingerprint = sha256_bytes(data)
ts = datetime.fromtimestamp(file_path.stat().st_mtime, tz=timezone.utc)
lastmod = isoformat_utc(ts)
return fingerprint, lastmod
def list_content_files(content_dir: Path = Path("content")) -> List[Path]:
exts = {".md", ".markdown", ".html"}
files = []
for p in content_dir.rglob("*"):
if p.is_file() and p.suffix.lower() in exts:
files.append(p)
return files
# --------------------
# 变更检测
# --------------------
def detect_changes(content_dir: Path, repo: Optional[Repo]) -> Tuple[bool, dict]:
"""
返回 (has_changes, state)
state: { "pages": {rel_path: {"fp": ..., "lastmod": ..., "draft": bool}}, "git_head": sha }
"""
ensure_cache_dir()
prev = load_json(STATE_FILE)
prev_pages = prev.get("pages", {})
prev_head = prev.get("git_head")
current = {"pages": {}, "git_head": None}
if repo:
try:
current["git_head"] = repo.head.commit.hexsha
except Exception:
current["git_head"] = None
has_changes = False
for f in list_content_files(content_dir):
rel = str(f)
fingerprint, lastmod = compute_page_fingerprint(f, repo)
# 读取 draft 状态(无则默认 False)
try:
post = frontmatter.load(f)
draft = bool(post.metadata.get("draft", False))
except Exception:
draft = False
current["pages"][rel] = {"fp": fingerprint, "lastmod": lastmod, "draft": draft}
prev_rec = prev_pages.get(rel)
if not prev_rec:
if not draft:
has_changes = True # 新增非草稿
else:
if prev_rec.get("fp") != fingerprint or prev_rec.get("draft") != draft:
has_changes = True # 实质性修改或草稿状态变化
# 检测删除
prev_set = set(prev_pages.keys())
curr_set = set(current["pages"].keys())
deleted = prev_set - curr_set
if deleted:
has_changes = True
# Git HEAD 变化也可触发(保证与 CI 协同)
if current["git_head"] and current["git_head"] != prev_head:
has_changes = True
return has_changes, current
# --------------------
# Hugo 构建与输出索引
# --------------------
def run_hugo_build():
cmd = ["hugo", "--minify"]
log(f"执行 Hugo 构建: {' '.join(cmd)}")
subprocess.run(cmd, check=True)
def parse_canonical_from_html(html_path: Path) -> Optional[str]:
try:
with html_path.open("r", encoding="utf-8", errors="ignore") as f:
soup = BeautifulSoup(f, "html.parser")
link = soup.find("link", rel="canonical")
if link and link.get("href"):
return link["href"].strip()
except Exception:
pass
return None
def derive_url_from_path(base_url: str, rel_path: str) -> str:
# Hugo 输出:/path/index.html -> /path/, /file.html -> /file.html
if rel_path.endswith("/index.html"):
url_path = rel_path[: -len("index.html")]
else:
url_path = rel_path
url_path = url_path.replace("\\", "/")
# 确保不出现重复斜杠
return f"{base_url}/{url_path}".replace("//", "/").replace(":/", "://")
def load_output_index(publish_dir: Path, base_url: str) -> List[dict]:
"""
返回 [{loc, lastmod}] 列表
优先使用自定义 JSON 索引(public/*.json),否则扫描 HTML。
"""
urls = []
# 1) 尝试读取自定义 JSON 索引(按站点实现而定,此处扫描顶层 JSON)
json_files = list(publish_dir.glob("*.json"))
if json_files:
for jf in json_files:
try:
data = load_json(jf)
# 兼容可能的结构:数组或对象下的列表
if isinstance(data, list):
items = data
elif isinstance(data, dict) and "pages" in data:
items = data["pages"]
else:
items = []
for it in items:
if not isinstance(it, dict):
continue
loc = it.get("permalink") or it.get("loc") or it.get("url")
if not loc:
continue
lastmod = it.get("lastmod")
if not lastmod:
# 回退 mtime
ts = datetime.fromtimestamp(jf.stat().st_mtime, tz=timezone.utc)
lastmod = isoformat_utc(ts)
urls.append({"loc": loc, "lastmod": lastmod})
except Exception as e:
log(f"读取索引 JSON 失败: {jf} - {e}")
# 2) 回退扫描 HTML 文件
if not urls:
for p in publish_dir.rglob("*.html"):
rel = str(p.relative_to(publish_dir))
loc = parse_canonical_from_html(p) or derive_url_from_path(base_url, rel)
ts = datetime.fromtimestamp(p.stat().st_mtime, tz=timezone.utc)
urls.append({"loc": loc, "lastmod": isoformat_utc(ts)})
# 去重
seen = set()
dedup = []
for u in urls:
if u["loc"] in seen:
continue
seen.add(u["loc"])
dedup.append(u)
return dedup
# --------------------
# 网站地图生成
# --------------------
def build_sitemap(urls: List[dict], out_path: Path, alternates: Optional[Dict[str, List[dict]]] = None):
"""
生成 sitemap.xml;可扩展 alternates 用于多语言
alternates 示例: { loc: [ {"hreflang": "en", "href": "https://.../en/..."} ] }
"""
urlset = Element("urlset")
urlset.set("xmlns", XML_NS)
urlset.set(f"{{{XML_NS}}}schemaLocation", XML_NS) # 可选
urlset.set(f"xmlns:xhtml", XML_XHTML_NS)
for u in urls:
url_el = SubElement(urlset, "url")
SubElement(url_el, "loc").text = u["loc"]
if u.get("lastmod"):
SubElement(url_el, "lastmod").text = u["lastmod"]
if u.get("changefreq"):
SubElement(url_el, "changefreq").text = u["changefreq"]
if u.get("priority"):
SubElement(url_el, "priority").text = str(u["priority"])
if alternates and u["loc"] in alternates:
for alt in alternates[u["loc"]]:
link_el = SubElement(url_el, f"{{{XML_XHTML_NS}}}link")
link_el.set("rel", "alternate")
link_el.set("hreflang", alt.get("hreflang", "x-default"))
link_el.set("href", alt["href"])
tree = ElementTree(urlset)
out_path.parent.mkdir(parents=True, exist_ok=True)
tree.write(out_path, encoding="utf-8", xml_declaration=True)
log(f"已生成网站地图: {out_path} ({len(urls)} 条)")
# --------------------
# 部署与状态更新
# --------------------
def deploy_sitemap(publish_dir: Path, site_root: Optional[Path], sitemap_name: str = "sitemap.xml"):
src = publish_dir / sitemap_name
if site_root and site_root != publish_dir:
dst = site_root / sitemap_name
data = read_file_bytes(src)
with dst.open("wb") as f:
f.write(data)
log(f"已部署网站地图到站点根目录: {dst}")
else:
log(f"网站根目录即 publishDir,无需额外拷贝: {src}")
def update_state(state: dict):
save_json(STATE_FILE, state)
log(f"已更新状态缓存: {STATE_FILE}")
# --------------------
# 主流程
# --------------------
def main():
# 1) 加载配置与环境
cfg = load_hugo_config()
base_url = get_base_url(cfg)
publish_dir = get_publish_dir(cfg)
site_root = get_site_root_path()
content_dir = Path("content")
repo = locate_repo()
# 2) 变更检测
has_changes, state = detect_changes(content_dir, repo)
if not has_changes and (publish_dir / "sitemap.xml").exists():
log("检测到无变化,且 sitemap 已存在,跳过重建。")
return
# 3) Hugo 构建
run_hugo_build()
# 4) 构建输出索引
urls = load_output_index(publish_dir, base_url)
# 可选:根据目录或类型设置 changefreq/priority
for u in urls:
# 简单示例规则:主页优先级高
if u["loc"].rstrip("/") == base_url:
u["priority"] = 0.8
u["changefreq"] = "daily"
elif "/posts/" in u["loc"] or "/blog/" in u["loc"]:
u["priority"] = 0.6
u["changefreq"] = "weekly"
else:
u["priority"] = 0.5
u["changefreq"] = "monthly"
# 5) 生成网站地图
sitemap_path = publish_dir / "sitemap.xml"
build_sitemap(urls, sitemap_path)
# 6) 部署到根目录
deploy_sitemap(publish_dir, site_root)
# 7) 更新状态
update_state(state)
if __name__ == "__main__":
try:
t0 = time.time()
main()
log(f"完成,耗时 {time.time() - t0:.2f}s")
except subprocess.CalledProcessError as e:
log(f"Hugo 构建失败: {e}")
sys.exit(1)
except Exception as e:
log(f"执行失败: {e}")
sys.exit(1)
在 layouts/_default/sitemap.json(或单页 list/single 对应 json 模板)中:
{{- /* 输出基础索引 */ -}}
{
"pages": [
{{- $first := true -}}
{{- range .Site.RegularPages -}}
{{- if not $first }},{{ end -}}
{
"permalink": "{{ .Permalink }}",
"lastmod": "{{ .Lastmod.Format "2006-01-02T15:04:05Z07:00" }}",
"section": "{{ .Section }}",
"language": "{{ .Lang }}"
}
{{- $first = false -}}
{{- end -}}
]
}
在 config.toml 配置对应输出:
[outputs]
home = ["HTML", "JSON"]
page = ["HTML", "JSON"]
section = ["HTML", "JSON"]
为开发者提供一种自动化解决方案,通过编写Python脚本实现XML网站地图的动态更新,以适应网站页面的变更和优化SEO表现,同时兼顾代码规范和性能优化。
无需手动处理复杂的地图更新,利用自动化脚本轻松管理网站SEO,帮助提升搜索排名。
为客户快速搭建并维护为搜索引擎优化的智能化网站地图,节省人工工作时间,提高项目交付价值。
通过脚本自动化管理不断变化的内容页面,将更多时间集中在内容策划与教学质量提升上。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
半价获取高级提示词-优惠即将到期