×
¥
查看详情
🔥 会员专享 文生文 爬取

编写数据爬取脚本

👁️ 376 次查看
📅 Sep 27, 2025
💡 核心价值: 利用Beautiful Soup编写Python脚本从指定URL提取数据。

🎯 可自定义参数(2个)

输入URL
需要抓取数据的目标URL,例如:https://example.com
输出语言
输出内容的语言类型,例如:中文

🎨 效果示例

以下脚本使用requests和BeautifulSoup从指定促销页抓取商品数据,并进行基础的预处理(文本清理、价格解析、字段标准化)、去重、结构化输出(CSV、JSON)。脚本包含对robots.txt的合规性检查、重试与限速、以及多策略解析(页面卡片、Microdata、JSON-LD),以提升在未知前端结构下的鲁棒性。可直接运行,默认保存到当前目录。

使用说明(简要):

  • 依赖:Python 3.8+,requests,beautifulsoup4
  • 安装:pip install requests beautifulsoup4
  • 运行:python scrape_promo.py
  • 输出:promo_summer_2025_products.csv,promo_summer_2025_products.json

脚本(scrape_promo.py): import requests from bs4 import BeautifulSoup import re import json import csv import logging import time from urllib.parse import urljoin, urlparse from urllib.robotparser import RobotFileParser from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry

PROMO_URL = "https://shop.testsite.cn/promo/summer-2025" USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0 Safari/537.36" REQUEST_TIMEOUT = 10 RATE_LIMIT_SECONDS = 1.0 # 基本限速,避免过快请求

logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s" )

def setup_session(): session = requests.Session() retries = Retry( total=3, backoff_factor=0.5, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=frozenset(["GET"]) ) adapter = HTTPAdapter(max_retries=retries) session.mount("http://", adapter) session.mount("https://", adapter) session.headers.update({ "User-Agent": USER_AGENT, "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,/;q=0.8" }) return session

def is_allowed_by_robots(url, user_agent=USER_AGENT, session=None): parsed = urlparse(url) robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt" rp = RobotFileParser() try: # 读取 robots.txt if session: resp = session.get(robots_url, timeout=REQUEST_TIMEOUT) if resp.status_code == 200: rp.parse(resp.text.splitlines()) else: # 无法获取robots,默认允许,但建议谨慎 logging.warning("无法获取 robots.txt,默认按允许处理:%s", robots_url) return True else: rp.set_url(robots_url) rp.read() return rp.can_fetch(user_agent, url) except Exception as e: logging.warning("检查 robots.txt 时出错:%s", e) return True

def fetch_html(url, session): time.sleep(RATE_LIMIT_SECONDS) resp = session.get(url, timeout=REQUEST_TIMEOUT) resp.raise_for_status() # 尝试合理编码 if not resp.encoding or resp.encoding.lower() == "iso-8859-1": resp.encoding = resp.apparent_encoding or "utf-8" return resp.text

def clean_text(text): if text is None: return None # 去除多余空白、不可见字符 return re.sub(r"\s+", " ", text).strip()

def parse_price(text): """ 解析价格字符串,返回 (value: float, currency: str or None) 支持格式示例:¥1,299.00、¥899、1299 元、CNY 499 """ if not text: return None, None t = clean_text(text) # 先匹配价格数值 m = re.search(r"(?P¥|¥|CNY|RMB|元)?\s*(?P\d{1,3}(?:[,\s]\d{3})*(?:.\d+)?|\d+(?:.\d+)?)", t) if not m: return None, None raw_val = m.group("value").replace(",", "").replace(" ", "") try: val = float(raw_val) except Exception: val = None cur = m.group("currency") # 归一化货币 if cur in ["¥", "¥", "元"]: cur = "CNY" return val, cur

def parse_discount(text): if not text: return None t = clean_text(text) m = re.search(r"(\d{1,3})\s*%", t) if m: pct = int(m.group(1)) if 0 < pct <= 100: return pct / 100.0 return None

def parse_rating(text): # 支持“4.5/5”、“评分 4.2”、“4.0 星”等 if not text: return None t = clean_text(text) m = re.search(r"(\d+(?:.\d+)?)\s*(?:/5|星|stars)", t, flags=re.IGNORECASE) return float(m.group(1)) if m else None

def extract_jsonld_products(soup, base_url): products = [] for tag in soup.find_all("script", type="application/ld+json"): try: data = json.loads(tag.string or tag.get_text()) except Exception: continue items = [] if isinstance(data, list): items = data elif isinstance(data, dict): # 如果是ItemList,展开itemListElement if data.get("@type") == "ItemList" and "itemListElement" in data: for el in data["itemListElement"]: # 列表项可能是ListItem,取其中的item obj = el.get("item") if isinstance(el, dict) else el if obj: items.append(obj) else: items = [data] for obj in items: if not isinstance(obj, dict): continue typ = obj.get("@type") if isinstance(typ, list): is_product = "Product" in typ else: is_product = typ == "Product" if not is_product: continue name = clean_text(obj.get("name")) url = obj.get("url") image = obj.get("image") description = clean_text(obj.get("description")) sku = obj.get("sku") or obj.get("mpn") brand = None if isinstance(obj.get("brand"), dict): brand = obj["brand"].get("name") offers = obj.get("offers") price = None currency = None original_price = None availability = None if isinstance(offers, list) and offers: off = offers[0] price, currency, availability = off.get("price"), off.get("priceCurrency"), off.get("availability") elif isinstance(offers, dict): price, currency, availability = offers.get("price"), offers.get("priceCurrency"), offers.get("availability") # 数值化价格 price_val = float(price) if isinstance(price, (int, float, str)) and re.match(r"^\d+(.\d+)?$", str(price)) else None # 标准化URL和图片 url = urljoin(base_url, url) if url else None image = urljoin(base_url, image) if isinstance(image, str) else image products.append({ "name": name, "product_url": url, "image_url": image, "sku": sku, "brand": brand, "price": price_val, "currency": currency, "original_price": original_price, "discount_rate": None, "stock_status": availability, "rating": None, "review_count": None, "description": description, "source": "jsonld" }) return products

def find_product_cards(soup): # 多策略选择可能的商品卡片 selectors = [ 'div[class*="product"]', 'li[class*="product"]', 'article[class*="product"]', 'div[class*="item"]', 'li[class*="item"]', 'div[class*="card"]', 'li[class*="card"]' ] cards = [] seen = set() for sel in selectors: for el in soup.select(sel): # 去重同一对象 oid = id(el) if oid not in seen: seen.add(oid) cards.append(el) return cards

def first_match_text(el, selectors_or_class_keywords): # selectors_or_class_keywords既可为CSS选择器列表,也可为类名关键词列表 if isinstance(selectors_or_class_keywords, (list, tuple)): # 先当做选择器尝试 for sel in selectors_or_class_keywords: found = el.select_one(sel) if found and clean_text(found.get_text()): return clean_text(found.get_text()) # 再当类关键词匹配 for kw in selectors_or_class_keywords: for tag in el.find_all(True, class_=lambda c: c and kw in " ".join(c) if isinstance(c, list) else (c and kw in c)): txt = clean_text(tag.get_text()) if txt: return txt return None

def get_attr_chain(el, attrs): for a in attrs: v = el.get(a) if v: return v return None

def parse_product_card(card, base_url): # 名称 name = None name_selectors = ['h1', 'h2', 'h3', 'a[class*="title"]', 'a[class*="name"]', 'div[class*="title"]', 'span[class*="title"]'] for sel in name_selectors: tag = card.select_one(sel) if tag: txt = clean_text(tag.get_text()) if txt: name = txt break if not name: # 尝试图片alt img = card.find("img") if img: alt = clean_text(img.get("alt")) if alt: name = alt

# 商品链接
product_url = None
a = card.find("a", href=True)
if a:
    href = a.get("href")
    if href and not href.startswith("#"):
        product_url = urljoin(base_url, href)

# 图片
image_url = None
img = card.find("img")
if img:
    image_url = get_attr_chain(img, ["data-src", "data-original", "src"])
    if image_url:
        image_url = urljoin(base_url, image_url)

# SKU
sku = get_attr_chain(card, ["data-sku", "data-id"])
if not sku:
    m = re.search(r"(SKU|货号)\s*[::]\s*([A-Za-z0-9\-_]+)", card.get_text())
    if m:
        sku = m.group(2)

# 价格相关
price = None
currency = None
original_price = None

# 优先从明显价格区块解析
price_blocks = card.find_all(True, class_=lambda c: c and (
    ("price" in " ".join(c)) or ("sale" in " ".join(c)) or ("final" in " ".join(c))
) if isinstance(c, list) else (c and ("price" in c or "sale" in c or "final" in c)))
if price_blocks:
    # 当前价
    for pb in price_blocks:
        val, cur = parse_price(pb.get_text())
        if val:
            price, currency = val, cur or currency
            break
    # 原价(可能在删除线或标注was/list)
    orig_blocks = card.find_all(["del", "s", "strike"]) + card.find_all(True, class_=lambda c: c and ("original" in c or "was" in c or "list" in c))
    for ob in orig_blocks:
        val, cur = parse_price(ob.get_text())
        if val:
            original_price = val
            if not currency and cur:
                currency = cur
            break

# 折扣
discount_rate = None
disc_blocks = card.find_all(True, class_=lambda c: c and ("discount" in c or "off" in c))
for db in disc_blocks:
    dr = parse_discount(db.get_text())
    if dr:
        discount_rate = dr
        break
# 若未解析到折扣但有原价和现价,计算折扣
if not discount_rate and original_price and price and original_price > 0 and price <= original_price:
    discount_rate = 1.0 - (price / original_price)

# 评分与评论数
rating = None
review_count = None
rating_blocks = card.find_all(True, class_=lambda c: c and ("rating" in c or "stars" in c))
for rb in rating_blocks:
    r = parse_rating(rb.get_text())
    if r:
        rating = r
        break
# 评论数
m = re.search(r"(\d+)\s*(条评|评论|reviews)", card.get_text(), flags=re.IGNORECASE)
if m:
    review_count = int(m.group(1))

# 库存状态
stock_status = None
text_all = card.get_text()
if re.search(r"(有货|现货|在库)", text_all):
    stock_status = "InStock"
elif re.search(r"(缺货|售罄|无货|OutOfStock)", text_all, flags=re.IGNORECASE):
    stock_status = "OutOfStock"

return {
    "name": name,
    "product_url": product_url,
    "image_url": image_url,
    "sku": sku,
    "brand": None,
    "price": price,
    "currency": currency,
    "original_price": original_price,
    "discount_rate": discount_rate,
    "stock_status": stock_status,
    "rating": rating,
    "review_count": review_count,
    "description": None,
    "source": "card"
}

def extract_microdata_products(soup, base_url): products = [] for prod in soup.find_all(attrs={"itemscope": True, "itemtype": re.compile("Product", re.IGNORECASE)}): def get_itemprop(prop): tag = prod.find(attrs={"itemprop": prop}) return clean_text(tag.get_text()) if tag and tag.get_text() else (tag.get("content") if tag else None) name = get_itemprop("name") url = get_itemprop("url") image = get_itemprop("image") sku = get_itemprop("sku") or get_itemprop("mpn") price = get_itemprop("price") currency = get_itemprop("priceCurrency") # 数值化价格 price_val = float(price) if price and re.match(r"^\d+(.\d+)?$", str(price)) else None products.append({ "name": name, "product_url": urljoin(base_url, url) if url else None, "image_url": urljoin(base_url, image) if image else None, "sku": sku, "brand": get_itemprop("brand"), "price": price_val, "currency": currency or "CNY", "original_price": None, "discount_rate": None, "stock_status": None, "rating": None, "review_count": None, "description": get_itemprop("description"), "source": "microdata" }) return products

def deduplicate(products): seen = set() out = [] for p in products: key = ( (p.get("sku") or "").strip().lower(), (p.get("product_url") or "").strip().lower() ) if key in seen: continue # 弱去重:若sku和url均为空,用name+price近似去重 if not key[0] and not key[1]: alt_key = ((p.get("name") or "").strip().lower(), str(p.get("price") or "")) if alt_key in seen: continue seen.add(alt_key) else: seen.add(key) out.append(p) return out

def normalize(products): """ 预处理:清理文本、标准化货币、过滤无名项。 """ out = [] for p in products: if not p.get("name"): continue p["name"] = clean_text(p["name"]) if p.get("currency") in ["¥", "¥", "元", None]: p["currency"] = "CNY" # 保证价格为float或None if isinstance(p.get("price"), str): val, cur = parse_price(p["price"]) p["price"] = val p["currency"] = p["currency"] or cur or "CNY" out.append(p) return out

def save_csv(products, path): fields = ["name", "product_url", "image_url", "sku", "brand", "price", "currency", "original_price", "discount_rate", "stock_status", "rating", "review_count", "description", "source"] with open(path, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=fields) writer.writeheader() for p in products: writer.writerow({k: p.get(k) for k in fields})

def save_json(products, path): with open(path, "w", encoding="utf-8") as f: json.dump(products, f, ensure_ascii=False, indent=2)

def main(): session = setup_session()

if not is_allowed_by_robots(PROMO_URL, USER_AGENT, session):
    logging.error("robots.txt不允许抓取该URL:%s", PROMO_URL)
    return

logging.info("开始抓取:%s", PROMO_URL)
html = fetch_html(PROMO_URL, session)
soup = BeautifulSoup(html, "html.parser")

# 多策略抽取
products = []
products += extract_jsonld_products(soup, PROMO_URL)
products += extract_microdata_products(soup, PROMO_URL)

# 卡片解析
cards = find_product_cards(soup)
logging.info("发现疑似商品卡片数量:%d", len(cards))
for card in cards:
    p = parse_product_card(card, PROMO_URL)
    # 至少需要名称或链接
    if p.get("name") or p.get("product_url"):
        products.append(p)

# 预处理与去重
products = normalize(products)
products = deduplicate(products)

logging.info("有效商品数量(去重后):%d", len(products))
if not products:
    logging.warning("未提取到任何商品,可能页面为动态渲染或选择器需调整。")
else:
    save_csv(products, "promo_summer_2025_products.csv")
    save_json(products, "promo_summer_2025_products.json")
    logging.info("已保存:promo_summer_2025_products.csv, promo_summer_2025_products.json")

if name == "main": main()

技术要点说明:

  • 解析策略优先使用结构化数据(JSON-LD、Microdata),其次回退到通用卡片选择器与正则提取,适用于未知或变动的页面结构。
  • 价格解析进行了格式容错与货币标准化(统一为CNY),便于后续建模,如价格分布、促销效果评估。
  • 去重优先使用SKU与URL,缺失时采用名称+价格近似去重,减少重复样本对分析的干扰。
  • 输出包含核心字段(名称、链接、图片、SKU、价格、折扣、库存、评分等),可直接导入数据分析管道。若需进一步分析,可在加载数据后进行数值填充、异常值处理、以及基于折扣率或库存状态的分群建模。

以下脚本使用 requests 与 Beautiful Soup 从指定栏目页抓取新闻数据,并包含基础的数据预处理(文本清洗、去重)、分页与速率控制、robots.txt 合规检查,以及可选的详情页补全。输出支持 CSV 与 JSONL,便于后续数据挖掘(特征工程、建模)的输入。

说明与假设:

  • 由于站点结构未知,脚本采用多组通用选择器与回退策略,尽可能抽取标题、链接、发布时间、作者与摘要。
  • 脚本会检查并遵守 robots.txt;请确保抓取行为符合目标站点的使用条款与法律法规。
  • 若页面依赖 JavaScript 动态渲染,需改用渲染型方案(如 Playwright/Selenium);本脚本仅处理静态 HTML。

使用方法:

  1. 安装依赖:pip install requests beautifulsoup4
  2. 运行示例:python scrape_marketing_insights.py 可选参数示例:python scrape_marketing_insights.py --url https://news.example.net/industry/marketing-insights --output-csv insights.csv --output-jsonl insights.jsonl --max-pages 3 --detail

Python脚本(保存为 scrape_marketing_insights.py):

import argparse
import csv
import json
import logging
import random
import re
import sys
import time
from dataclasses import dataclass, asdict
from typing import List, Optional, Set
from urllib.parse import urljoin, urlparse
from urllib.robotparser import RobotFileParser

import requests
from bs4 import BeautifulSoup
from bs4.element import Tag

# -----------------------
# 配置与工具函数
# -----------------------

USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0 Safari/537.36"
DEFAULT_URL = "https://news.example.net/industry/marketing-insights"

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[logging.StreamHandler(sys.stdout)],
)

@dataclass
class Article:
    title: str
    url: str
    published_at: Optional[str]
    author: Optional[str]
    summary: Optional[str]
    source: str

def normalize_whitespace(text: Optional[str]) -> Optional[str]:
    if text is None:
        return None
    return re.sub(r"\s+", " ", text).strip()

def build_robot_parser(start_url: str) -> RobotFileParser:
    parsed = urlparse(start_url)
    robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt"
    rp = RobotFileParser()
    rp.set_url(robots_url)
    try:
        rp.read()
        logging.info(f"robots.txt 加载完成: {robots_url}")
    except Exception as e:
        logging.warning(f"robots.txt 加载失败: {robots_url} -> {e}")
    return rp

def can_fetch(rp: RobotFileParser, url: str, user_agent: str = USER_AGENT) -> bool:
    # 如果 robots.txt 无法读取,RobotFileParser 默认返回 True
    try:
        return rp.can_fetch(user_agent, url)
    except Exception:
        return True

def get_session() -> requests.Session:
    s = requests.Session()
    s.headers.update({
        "User-Agent": USER_AGENT,
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
    })
    return s

# -----------------------
# 解析函数(列表页与详情页)
# -----------------------

def find_article_containers(soup: BeautifulSoup) -> List[Tag]:
    # 通用选择器组合,尽可能兼容不同站点结构
    containers: List[Tag] = []
    containers.extend(soup.find_all("article"))
    containers.extend(soup.select('div[itemtype*="NewsArticle"], div[itemtype*="Article"]'))
    for cls in ["news", "article", "post", "story", "card", "entry"]:
        containers.extend(soup.select(f"div[class*='{cls}'], li[class*='{cls}']"))
    # 去重:以元素的字符串位置标识符(sourceline)+名称近似去重
    seen_ids = set()
    unique = []
    for tag in containers:
        key = (tag.name, getattr(tag, "sourceline", None), getattr(tag, "sourcepos", None))
        if key not in seen_ids:
            seen_ids.add(key)
            unique.append(tag)
    return unique

def extract_title_and_url(container: Tag, base_url: str) -> (Optional[str], Optional[str]):
    # 优先从标题标签提取
    title_tag = None
    for sel in ["h1 a", "h2 a", "h3 a", "h1", "h2", "h3", "a"]:
        title_tag = container.select_one(sel)
        if title_tag:
            break
    title = None
    url = None
    if title_tag:
        if title_tag.name == "a":
            title = title_tag.get_text(strip=True) or None
            href = title_tag.get("href")
            if href:
                url = urljoin(base_url, href)
        else:
            title = title_tag.get_text(strip=True) or None
            a = title_tag.find("a", href=True)
            if a and a.get("href"):
                url = urljoin(base_url, a["href"])
    # 回退:任何可点击链接
    if not url:
        a = container.find("a", href=True)
        if a and a.get_text(strip=True):
            url = urljoin(base_url, a["href"])
            title = title or a.get_text(strip=True)
    return normalize_whitespace(title), url

def extract_published_at(container: Tag) -> Optional[str]:
    # time 标签
    t = container.find("time")
    if t:
        dt = t.get("datetime") or t.get_text(strip=True)
        if dt:
            return normalize_whitespace(dt)
    # meta 标签(结构化)
    for sel in [
        'meta[itemprop="datePublished"]',
        'meta[property="article:published_time"]',
        'meta[name="pubdate"]',
        'span[itemprop="datePublished"]',
    ]:
        m = container.select_one(sel)
        if m:
            content = m.get("content") or m.get_text(strip=True)
            if content:
                return normalize_whitespace(content)
    return None

def extract_author(container: Tag) -> Optional[str]:
    for sel in [
        ".author", ".byline", '[itemprop="author"]', 'a[rel="author"]', 'meta[name="author"]',
    ]:
        el = container.select_one(sel)
        if el:
            content = el.get("content") if el.name == "meta" else el.get_text(strip=True)
            if content:
                return normalize_whitespace(content)
    return None

def extract_summary(container: Tag) -> Optional[str]:
    for sel in [".summary", ".dek", ".abstract", "p"]:
        el = container.select_one(sel)
        if el:
            text = el.get_text(" ", strip=True)
            if text and len(text.split()) >= 3:
                return normalize_whitespace(text)
    return None

def parse_detail_page(soup: BeautifulSoup) -> dict:
    # 从详情页提取更规范的字段(若存在)
    data = {"title": None, "published_at": None, "author": None, "summary": None}
    # 标题
    title_tag = soup.find(["h1", "title"])
    if title_tag:
        data["title"] = normalize_whitespace(title_tag.get_text(strip=True))
    og_title = soup.select_one('meta[property="og:title"]')
    if og_title and og_title.get("content"):
        data["title"] = normalize_whitespace(og_title["content"]) or data["title"]
    # 摘要
    og_desc = soup.select_one('meta[property="og:description"], meta[name="description"]')
    if og_desc and og_desc.get("content"):
        data["summary"] = normalize_whitespace(og_desc["content"])
    # 发布时间
    for sel in [
        'meta[property="article:published_time"]',
        'meta[itemprop="datePublished"]',
        'time[datetime]',
    ]:
        el = soup.select_one(sel)
        if el:
            content = el.get("content") or el.get("datetime") or el.get_text(strip=True)
            if content:
                data["published_at"] = normalize_whitespace(content)
                break
    # 作者
    for sel in [
        'meta[name="author"]',
        '[itemprop="author"]',
        '.author', '.byline', 'a[rel="author"]',
    ]:
        el = soup.select_one(sel)
        if el:
            content = el.get("content") if el.name == "meta" else el.get_text(strip=True)
            if content:
                data["author"] = normalize_whitespace(content)
                break
    # JSON-LD(可选增强)
    for script in soup.find_all("script", type="application/ld+json"):
        try:
            payload = json.loads(script.string or "")
        except Exception:
            continue
        objs = payload if isinstance(payload, list) else [payload]
        for obj in objs:
            t = obj.get("@type") or obj.get("@graph", [{}])[0].get("@type")
            if isinstance(t, list):
                t = ",".join(t)
            if t and ("NewsArticle" in t or "Article" in t):
                data["title"] = normalize_whitespace(obj.get("headline")) or data["title"]
                data["summary"] = normalize_whitespace(obj.get("description")) or data["summary"]
                pub = obj.get("datePublished") or obj.get("dateCreated")
                if pub:
                    data["published_at"] = normalize_whitespace(pub)
                author = obj.get("author")
                if isinstance(author, dict):
                    data["author"] = normalize_whitespace(author.get("name")) or data["author"]
                elif isinstance(author, list) and author:
                    if isinstance(author[0], dict):
                        data["author"] = normalize_whitespace(author[0].get("name")) or data["author"]
                    elif isinstance(author[0], str):
                        data["author"] = normalize_whitespace(author[0]) or data["author"]
                break
    return data

# -----------------------
# 抓取主流程
# -----------------------

def scrape_list(start_url: str,
                max_pages: int = 1,
                sleep_min: float = 1.0,
                sleep_max: float = 2.5,
                fetch_detail: bool = False) -> List[Article]:
    session = get_session()
    rp = build_robot_parser(start_url)
    articles: List[Article] = []
    seen_urls: Set[str] = set()

    def fetch(url: str) -> Optional[BeautifulSoup]:
        if not can_fetch(rp, url):
            logging.warning(f"robots 不允许抓取:{url}")
            return None
        try:
            resp = session.get(url, timeout=30)
            if resp.status_code != 200:
                logging.warning(f"请求失败 {resp.status_code}: {url}")
                return None
            return BeautifulSoup(resp.text, "html.parser")
        except requests.RequestException as e:
            logging.error(f"请求异常: {url} -> {e}")
            return None

    page_url = start_url
    base = f"{urlparse(start_url).scheme}://{urlparse(start_url).netloc}"

    for page_idx in range(max_pages):
        soup = fetch(page_url)
        if soup is None:
            break

        containers = find_article_containers(soup)
        if not containers:
            logging.info("未找到文章容器,尝试使用更宽松的链接抽取")
            # 宽松抽取:从主体区块抽取链接
            for a in soup.select("main a[href], .content a[href], .container a[href]"):
                title = normalize_whitespace(a.get_text(strip=True))
                if not title or len(title) < 5:
                    continue
                url = urljoin(base, a.get("href"))
                if url in seen_urls:
                    continue
                articles.append(Article(
                    title=title,
                    url=url,
                    published_at=None,
                    author=None,
                    summary=None,
                    source=start_url,
                ))
                seen_urls.add(url)
        else:
            for c in containers:
                title, url = extract_title_and_url(c, base)
                if not url or not title:
                    continue
                if url in seen_urls:
                    continue
                published_at = extract_published_at(c)
                author = extract_author(c)
                summary = extract_summary(c)

                # 详情页补全(可选)
                if fetch_detail and can_fetch(rp, url):
                    time.sleep(random.uniform(sleep_min, sleep_max))
                    dsoup = fetch(url)
                    if dsoup:
                        detail = parse_detail_page(dsoup)
                        title = detail["title"] or title
                        summary = detail["summary"] or summary
                        published_at = detail["published_at"] or published_at
                        author = detail["author"] or author

                articles.append(Article(
                    title=title,
                    url=url,
                    published_at=published_at,
                    author=author,
                    summary=summary,
                    source=start_url,
                ))
                seen_urls.add(url)

        # 分页:查找下一页链接
        next_link = soup.select_one('a[rel="next"], a.next, a[aria-label*="下一页"], a[aria-label*="Next"], li.next a')
        if next_link and next_link.get("href"):
            next_url = urljoin(base, next_link.get("href"))
            if next_url == page_url:
                break
            page_url = next_url
            logging.info(f"跳转下一页: {page_url}")
            time.sleep(random.uniform(sleep_min, sleep_max))
        else:
            break

    return articles

# -----------------------
# 输出与 CLI
# -----------------------

def write_csv(path: str, items: List[Article]) -> None:
    with open(path, "w", encoding="utf-8", newline="") as f:
        w = csv.DictWriter(f, fieldnames=["title", "url", "published_at", "author", "summary", "source"])
        w.writeheader()
        for it in items:
            w.writerow(asdict(it))

def write_jsonl(path: str, items: List[Article]) -> None:
    with open(path, "w", encoding="utf-8") as f:
        for it in items:
            f.write(json.dumps(asdict(it), ensure_ascii=False) + "\n")

def main():
    parser = argparse.ArgumentParser(description="Marketing Insights 数据抓取(BeautifulSoup)")
    parser.add_argument("--url", type=str, default=DEFAULT_URL, help="起始列表页 URL")
    parser.add_argument("--max-pages", type=int, default=1, help="最大分页页数")
    parser.add_argument("--sleep-min", type=float, default=1.0, help="最小请求间隔(秒)")
    parser.add_argument("--sleep-max", type=float, default=2.5, help="最大请求间隔(秒)")
    parser.add_argument("--detail", action="store_true", help="是否抓取详情页补全字段")
    parser.add_argument("--output-csv", type=str, default=None, help="CSV 输出路径")
    parser.add_argument("--output-jsonl", type=str, default=None, help="JSONL 输出路径")
    args = parser.parse_args()

    items = scrape_list(
        start_url=args.url,
        max_pages=args.max_pages,
        sleep_min=args.sleep_min,
        sleep_max=args.sleep_max,
        fetch_detail=args.detail,
    )

    logging.info(f"抓取完成,共 {len(items)} 条")
    if args.output_csv:
        write_csv(args.output_csv, items)
        logging.info(f"CSV 已写入: {args.output_csv}")
    if args.output_jsonl:
        write_jsonl(args.output_jsonl, items)
        logging.info(f"JSONL 已写入: {args.output_jsonl}")

    # 如果未指定输出文件,打印示例前几条
    if not args.output_csv and not args.output_jsonl:
        for it in items[:5]:
            print(json.dumps(asdict(it), ensure_ascii=False))

if __name__ == "__main__":
    main()

数据挖掘建议(采集到数据后的处理要点):

  • 字段标准化:将 published_at 统一转换为 ISO8601;作者名进行规范化与实体消歧。
  • 去重策略:依据 URL、标题+发布时间的组合键进行二次去重。
  • 内容质量过滤:摘要长度阈值、关键词过滤(如“营销”“洞察”等)、语言检测,提升样本纯度。
  • 结构化增强:若可用,优先采用页面中的结构化数据(JSON-LD)填补缺失字段。
  • 合规与速率:遵守 robots 与站点条款;设置合理的间隔与并发限流,避免对站点造成压力。

示例详情

📖 如何使用

30秒出活:复制 → 粘贴 → 搞定
与其花几十分钟和AI聊天、试错,不如直接复制这些经过千人验证的模板,修改几个 {{变量}} 就能立刻获得专业级输出。省下来的时间,足够你轻松享受两杯咖啡!
加载中...
💬 不会填参数?让 AI 反过来问你
不确定变量该填什么?一键转为对话模式,AI 会像资深顾问一样逐步引导你,问几个问题就能自动生成完美匹配你需求的定制结果。零门槛,开口就行。
转为对话模式
🚀 告别复制粘贴,Chat 里直接调用
无需切换,输入 / 唤醒 8000+ 专家级提示词。 插件将全站提示词库深度集成于 Chat 输入框。基于当前对话语境,系统智能推荐最契合的 Prompt 并自动完成参数化,让海量资源触手可及,从此彻底告别"手动搬运"。
即将推出
🔌 接口一调,提示词自己会进化
手动跑一次还行,跑一百次呢?通过 API 接口动态注入变量,接入批量评价引擎,让程序自动迭代出更高质量的提示词方案。Prompt 会自己进化,你只管收结果。
发布 API
🤖 一键变成你的专属 Agent 应用
不想每次都配参数?把这条提示词直接发布成独立 Agent,内嵌图片生成、参数优化等工具,分享链接就能用。给团队或客户一个"开箱即用"的完整方案。
创建 Agent

✅ 特性总结

一键生成可运行的网页数据抓取脚本,输入URL即得,快速开启采集。
自动识别页面结构与内容块,灵活提取标题、文本、链接等关键信息。
可按需求定制抓取范围与规则,避免冗余信息,聚焦业务必需数据。
生成清晰的步骤说明与使用指南,便于非技术同事快速上手。
内置数据清洗与预处理建议,采集后即可用于分析与报表。
支持多语言输出与注释,方便跨团队协作与对外交付。
提供异常与反爬提示方案,提升采集稳定性与任务完成率。
可模板化复用,批量替换URL与参数,轻松扩展到多站点。
与营销、竞品、内容运营场景紧密结合,直接驱动增长与决策。

🎯 解决的问题

将“给定网址+期望输出语言”一键转化为可直接运行的网页数据抓取脚本与清晰操作说明,面向增长、运营、产品、竞品研究与数据分析等高频场景,帮助用户在分钟级完成网页信息采集、基础清洗与结构化整理,显著降低技术门槛,提升交付确定性,为后续分析、可视化与报告撰写提供可靠的数据输入。

🕒 版本历史

当前版本
v2.1 2024-01-15
优化输出结构,增强情节连贯性
  • ✨ 新增章节节奏控制参数
  • 🔧 优化人物关系描述逻辑
  • 📝 改进主题深化引导语
  • 🎯 增强情节转折点设计
v2.0 2023-12-20
重构提示词架构,提升生成质量
  • 🚀 全新的提示词结构设计
  • 📊 增加输出格式化选项
  • 💡 优化角色塑造引导
v1.5 2023-11-10
修复已知问题,提升稳定性
  • 🐛 修复长文本处理bug
  • ⚡ 提升响应速度
v1.0 2023-10-01
首次发布
  • 🎉 初始版本上线
COMING SOON
版本历史追踪,即将启航
记录每一次提示词的进化与升级,敬请期待。

💬 用户评价

4.8
⭐⭐⭐⭐⭐
基于 28 条评价
5星
85%
4星
12%
3星
3%
👤
电商运营 - 张先生
⭐⭐⭐⭐⭐ 2025-01-15
双十一用这个提示词生成了20多张海报,效果非常好!点击率提升了35%,节省了大量设计时间。参数调整很灵活,能快速适配不同节日。
效果好 节省时间
👤
品牌设计师 - 李女士
⭐⭐⭐⭐⭐ 2025-01-10
作为设计师,这个提示词帮我快速生成创意方向,大大提升了工作效率。生成的海报氛围感很强,稍作调整就能直接使用。
创意好 专业
COMING SOON
用户评价与反馈系统,即将上线
倾听真实反馈,在这里留下您的使用心得,敬请期待。
加载中...