编写数据爬取脚本

0 浏览
0 试用
0 购买
Sep 27, 2025更新

利用Beautiful Soup编写Python脚本从指定URL提取数据。

示例1

以下脚本使用requests和BeautifulSoup从指定促销页抓取商品数据,并进行基础的预处理(文本清理、价格解析、字段标准化)、去重、结构化输出(CSV、JSON)。脚本包含对robots.txt的合规性检查、重试与限速、以及多策略解析(页面卡片、Microdata、JSON-LD),以提升在未知前端结构下的鲁棒性。可直接运行,默认保存到当前目录。

使用说明(简要):
- 依赖:Python 3.8+,requests,beautifulsoup4
- 安装:pip install requests beautifulsoup4
- 运行:python scrape_promo.py
- 输出:promo_summer_2025_products.csv,promo_summer_2025_products.json

脚本(scrape_promo.py):
import requests
from bs4 import BeautifulSoup
import re
import json
import csv
import logging
import time
from urllib.parse import urljoin, urlparse
from urllib.robotparser import RobotFileParser
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

PROMO_URL = "https://shop.testsite.cn/promo/summer-2025"
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0 Safari/537.36"
REQUEST_TIMEOUT = 10
RATE_LIMIT_SECONDS = 1.0  # 基本限速,避免过快请求

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s"
)

def setup_session():
    session = requests.Session()
    retries = Retry(
        total=3,
        backoff_factor=0.5,
        status_forcelist=[429, 500, 502, 503, 504],
        allowed_methods=frozenset(["GET"])
    )
    adapter = HTTPAdapter(max_retries=retries)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    session.headers.update({
        "User-Agent": USER_AGENT,
        "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"
    })
    return session

def is_allowed_by_robots(url, user_agent=USER_AGENT, session=None):
    parsed = urlparse(url)
    robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt"
    rp = RobotFileParser()
    try:
        # 读取 robots.txt
        if session:
            resp = session.get(robots_url, timeout=REQUEST_TIMEOUT)
            if resp.status_code == 200:
                rp.parse(resp.text.splitlines())
            else:
                # 无法获取robots,默认允许,但建议谨慎
                logging.warning("无法获取 robots.txt,默认按允许处理:%s", robots_url)
                return True
        else:
            rp.set_url(robots_url)
            rp.read()
        return rp.can_fetch(user_agent, url)
    except Exception as e:
        logging.warning("检查 robots.txt 时出错:%s", e)
        return True

def fetch_html(url, session):
    time.sleep(RATE_LIMIT_SECONDS)
    resp = session.get(url, timeout=REQUEST_TIMEOUT)
    resp.raise_for_status()
    # 尝试合理编码
    if not resp.encoding or resp.encoding.lower() == "iso-8859-1":
        resp.encoding = resp.apparent_encoding or "utf-8"
    return resp.text

def clean_text(text):
    if text is None:
        return None
    # 去除多余空白、不可见字符
    return re.sub(r"\s+", " ", text).strip()

def parse_price(text):
    """
    解析价格字符串,返回 (value: float, currency: str or None)
    支持格式示例:¥1,299.00、¥899、1299 元、CNY 499
    """
    if not text:
        return None, None
    t = clean_text(text)
    # 先匹配价格数值
    m = re.search(r"(?P<currency>¥|¥|CNY|RMB|元)?\s*(?P<value>\d{1,3}(?:[,\s]\d{3})*(?:\.\d+)?|\d+(?:\.\d+)?)", t)
    if not m:
        return None, None
    raw_val = m.group("value").replace(",", "").replace(" ", "")
    try:
        val = float(raw_val)
    except Exception:
        val = None
    cur = m.group("currency")
    # 归一化货币
    if cur in ["¥", "¥", "元"]:
        cur = "CNY"
    return val, cur

def parse_discount(text):
    if not text:
        return None
    t = clean_text(text)
    m = re.search(r"(\d{1,3})\s*%", t)
    if m:
        pct = int(m.group(1))
        if 0 < pct <= 100:
            return pct / 100.0
    return None

def parse_rating(text):
    # 支持“4.5/5”、“评分 4.2”、“4.0 星”等
    if not text:
        return None
    t = clean_text(text)
    m = re.search(r"(\d+(?:\.\d+)?)\s*(?:/5|星|stars)", t, flags=re.IGNORECASE)
    return float(m.group(1)) if m else None

def extract_jsonld_products(soup, base_url):
    products = []
    for tag in soup.find_all("script", type="application/ld+json"):
        try:
            data = json.loads(tag.string or tag.get_text())
        except Exception:
            continue
        items = []
        if isinstance(data, list):
            items = data
        elif isinstance(data, dict):
            # 如果是ItemList,展开itemListElement
            if data.get("@type") == "ItemList" and "itemListElement" in data:
                for el in data["itemListElement"]:
                    # 列表项可能是ListItem,取其中的item
                    obj = el.get("item") if isinstance(el, dict) else el
                    if obj:
                        items.append(obj)
            else:
                items = [data]
        for obj in items:
            if not isinstance(obj, dict):
                continue
            typ = obj.get("@type")
            if isinstance(typ, list):
                is_product = "Product" in typ
            else:
                is_product = typ == "Product"
            if not is_product:
                continue
            name = clean_text(obj.get("name"))
            url = obj.get("url")
            image = obj.get("image")
            description = clean_text(obj.get("description"))
            sku = obj.get("sku") or obj.get("mpn")
            brand = None
            if isinstance(obj.get("brand"), dict):
                brand = obj["brand"].get("name")
            offers = obj.get("offers")
            price = None
            currency = None
            original_price = None
            availability = None
            if isinstance(offers, list) and offers:
                off = offers[0]
                price, currency, availability = off.get("price"), off.get("priceCurrency"), off.get("availability")
            elif isinstance(offers, dict):
                price, currency, availability = offers.get("price"), offers.get("priceCurrency"), offers.get("availability")
            # 数值化价格
            price_val = float(price) if isinstance(price, (int, float, str)) and re.match(r"^\d+(\.\d+)?$", str(price)) else None
            # 标准化URL和图片
            url = urljoin(base_url, url) if url else None
            image = urljoin(base_url, image) if isinstance(image, str) else image
            products.append({
                "name": name,
                "product_url": url,
                "image_url": image,
                "sku": sku,
                "brand": brand,
                "price": price_val,
                "currency": currency,
                "original_price": original_price,
                "discount_rate": None,
                "stock_status": availability,
                "rating": None,
                "review_count": None,
                "description": description,
                "source": "jsonld"
            })
    return products

def find_product_cards(soup):
    # 多策略选择可能的商品卡片
    selectors = [
        'div[class*="product"]', 'li[class*="product"]', 'article[class*="product"]',
        'div[class*="item"]', 'li[class*="item"]',
        'div[class*="card"]', 'li[class*="card"]'
    ]
    cards = []
    seen = set()
    for sel in selectors:
        for el in soup.select(sel):
            # 去重同一对象
            oid = id(el)
            if oid not in seen:
                seen.add(oid)
                cards.append(el)
    return cards

def first_match_text(el, selectors_or_class_keywords):
    # selectors_or_class_keywords既可为CSS选择器列表,也可为类名关键词列表
    if isinstance(selectors_or_class_keywords, (list, tuple)):
        # 先当做选择器尝试
        for sel in selectors_or_class_keywords:
            found = el.select_one(sel)
            if found and clean_text(found.get_text()):
                return clean_text(found.get_text())
        # 再当类关键词匹配
        for kw in selectors_or_class_keywords:
            for tag in el.find_all(True, class_=lambda c: c and kw in " ".join(c) if isinstance(c, list) else (c and kw in c)):
                txt = clean_text(tag.get_text())
                if txt:
                    return txt
    return None

def get_attr_chain(el, attrs):
    for a in attrs:
        v = el.get(a)
        if v:
            return v
    return None

def parse_product_card(card, base_url):
    # 名称
    name = None
    name_selectors = ['h1', 'h2', 'h3', 'a[class*="title"]', 'a[class*="name"]', 'div[class*="title"]', 'span[class*="title"]']
    for sel in name_selectors:
        tag = card.select_one(sel)
        if tag:
            txt = clean_text(tag.get_text())
            if txt:
                name = txt
                break
    if not name:
        # 尝试图片alt
        img = card.find("img")
        if img:
            alt = clean_text(img.get("alt"))
            if alt:
                name = alt

    # 商品链接
    product_url = None
    a = card.find("a", href=True)
    if a:
        href = a.get("href")
        if href and not href.startswith("#"):
            product_url = urljoin(base_url, href)

    # 图片
    image_url = None
    img = card.find("img")
    if img:
        image_url = get_attr_chain(img, ["data-src", "data-original", "src"])
        if image_url:
            image_url = urljoin(base_url, image_url)

    # SKU
    sku = get_attr_chain(card, ["data-sku", "data-id"])
    if not sku:
        m = re.search(r"(SKU|货号)\s*[::]\s*([A-Za-z0-9\-_]+)", card.get_text())
        if m:
            sku = m.group(2)

    # 价格相关
    price = None
    currency = None
    original_price = None

    # 优先从明显价格区块解析
    price_blocks = card.find_all(True, class_=lambda c: c and (
        ("price" in " ".join(c)) or ("sale" in " ".join(c)) or ("final" in " ".join(c))
    ) if isinstance(c, list) else (c and ("price" in c or "sale" in c or "final" in c)))
    if price_blocks:
        # 当前价
        for pb in price_blocks:
            val, cur = parse_price(pb.get_text())
            if val:
                price, currency = val, cur or currency
                break
        # 原价(可能在删除线或标注was/list)
        orig_blocks = card.find_all(["del", "s", "strike"]) + card.find_all(True, class_=lambda c: c and ("original" in c or "was" in c or "list" in c))
        for ob in orig_blocks:
            val, cur = parse_price(ob.get_text())
            if val:
                original_price = val
                if not currency and cur:
                    currency = cur
                break

    # 折扣
    discount_rate = None
    disc_blocks = card.find_all(True, class_=lambda c: c and ("discount" in c or "off" in c))
    for db in disc_blocks:
        dr = parse_discount(db.get_text())
        if dr:
            discount_rate = dr
            break
    # 若未解析到折扣但有原价和现价,计算折扣
    if not discount_rate and original_price and price and original_price > 0 and price <= original_price:
        discount_rate = 1.0 - (price / original_price)

    # 评分与评论数
    rating = None
    review_count = None
    rating_blocks = card.find_all(True, class_=lambda c: c and ("rating" in c or "stars" in c))
    for rb in rating_blocks:
        r = parse_rating(rb.get_text())
        if r:
            rating = r
            break
    # 评论数
    m = re.search(r"(\d+)\s*(条评|评论|reviews)", card.get_text(), flags=re.IGNORECASE)
    if m:
        review_count = int(m.group(1))

    # 库存状态
    stock_status = None
    text_all = card.get_text()
    if re.search(r"(有货|现货|在库)", text_all):
        stock_status = "InStock"
    elif re.search(r"(缺货|售罄|无货|OutOfStock)", text_all, flags=re.IGNORECASE):
        stock_status = "OutOfStock"

    return {
        "name": name,
        "product_url": product_url,
        "image_url": image_url,
        "sku": sku,
        "brand": None,
        "price": price,
        "currency": currency,
        "original_price": original_price,
        "discount_rate": discount_rate,
        "stock_status": stock_status,
        "rating": rating,
        "review_count": review_count,
        "description": None,
        "source": "card"
    }

def extract_microdata_products(soup, base_url):
    products = []
    for prod in soup.find_all(attrs={"itemscope": True, "itemtype": re.compile("Product", re.IGNORECASE)}):
        def get_itemprop(prop):
            tag = prod.find(attrs={"itemprop": prop})
            return clean_text(tag.get_text()) if tag and tag.get_text() else (tag.get("content") if tag else None)
        name = get_itemprop("name")
        url = get_itemprop("url")
        image = get_itemprop("image")
        sku = get_itemprop("sku") or get_itemprop("mpn")
        price = get_itemprop("price")
        currency = get_itemprop("priceCurrency")
        # 数值化价格
        price_val = float(price) if price and re.match(r"^\d+(\.\d+)?$", str(price)) else None
        products.append({
            "name": name,
            "product_url": urljoin(base_url, url) if url else None,
            "image_url": urljoin(base_url, image) if image else None,
            "sku": sku,
            "brand": get_itemprop("brand"),
            "price": price_val,
            "currency": currency or "CNY",
            "original_price": None,
            "discount_rate": None,
            "stock_status": None,
            "rating": None,
            "review_count": None,
            "description": get_itemprop("description"),
            "source": "microdata"
        })
    return products

def deduplicate(products):
    seen = set()
    out = []
    for p in products:
        key = (
            (p.get("sku") or "").strip().lower(),
            (p.get("product_url") or "").strip().lower()
        )
        if key in seen:
            continue
        # 弱去重:若sku和url均为空,用name+price近似去重
        if not key[0] and not key[1]:
            alt_key = ((p.get("name") or "").strip().lower(), str(p.get("price") or ""))
            if alt_key in seen:
                continue
            seen.add(alt_key)
        else:
            seen.add(key)
        out.append(p)
    return out

def normalize(products):
    """
    预处理:清理文本、标准化货币、过滤无名项。
    """
    out = []
    for p in products:
        if not p.get("name"):
            continue
        p["name"] = clean_text(p["name"])
        if p.get("currency") in ["¥", "¥", "元", None]:
            p["currency"] = "CNY"
        # 保证价格为float或None
        if isinstance(p.get("price"), str):
            val, cur = parse_price(p["price"])
            p["price"] = val
            p["currency"] = p["currency"] or cur or "CNY"
        out.append(p)
    return out

def save_csv(products, path):
    fields = ["name", "product_url", "image_url", "sku", "brand", "price", "currency", "original_price", "discount_rate", "stock_status", "rating", "review_count", "description", "source"]
    with open(path, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fields)
        writer.writeheader()
        for p in products:
            writer.writerow({k: p.get(k) for k in fields})

def save_json(products, path):
    with open(path, "w", encoding="utf-8") as f:
        json.dump(products, f, ensure_ascii=False, indent=2)

def main():
    session = setup_session()

    if not is_allowed_by_robots(PROMO_URL, USER_AGENT, session):
        logging.error("robots.txt不允许抓取该URL:%s", PROMO_URL)
        return

    logging.info("开始抓取:%s", PROMO_URL)
    html = fetch_html(PROMO_URL, session)
    soup = BeautifulSoup(html, "html.parser")

    # 多策略抽取
    products = []
    products += extract_jsonld_products(soup, PROMO_URL)
    products += extract_microdata_products(soup, PROMO_URL)

    # 卡片解析
    cards = find_product_cards(soup)
    logging.info("发现疑似商品卡片数量:%d", len(cards))
    for card in cards:
        p = parse_product_card(card, PROMO_URL)
        # 至少需要名称或链接
        if p.get("name") or p.get("product_url"):
            products.append(p)

    # 预处理与去重
    products = normalize(products)
    products = deduplicate(products)

    logging.info("有效商品数量(去重后):%d", len(products))
    if not products:
        logging.warning("未提取到任何商品,可能页面为动态渲染或选择器需调整。")
    else:
        save_csv(products, "promo_summer_2025_products.csv")
        save_json(products, "promo_summer_2025_products.json")
        logging.info("已保存:promo_summer_2025_products.csv, promo_summer_2025_products.json")

if __name__ == "__main__":
    main()

技术要点说明:
- 解析策略优先使用结构化数据(JSON-LD、Microdata),其次回退到通用卡片选择器与正则提取,适用于未知或变动的页面结构。
- 价格解析进行了格式容错与货币标准化(统一为CNY),便于后续建模,如价格分布、促销效果评估。
- 去重优先使用SKU与URL,缺失时采用名称+价格近似去重,减少重复样本对分析的干扰。
- 输出包含核心字段(名称、链接、图片、SKU、价格、折扣、库存、评分等),可直接导入数据分析管道。若需进一步分析,可在加载数据后进行数值填充、异常值处理、以及基于折扣率或库存状态的分群建模。

示例2

以下脚本使用 requests 与 Beautiful Soup 从指定栏目页抓取新闻数据,并包含基础的数据预处理(文本清洗、去重)、分页与速率控制、robots.txt 合规检查,以及可选的详情页补全。输出支持 CSV 与 JSONL,便于后续数据挖掘(特征工程、建模)的输入。

说明与假设:
- 由于站点结构未知,脚本采用多组通用选择器与回退策略,尽可能抽取标题、链接、发布时间、作者与摘要。
- 脚本会检查并遵守 robots.txt;请确保抓取行为符合目标站点的使用条款与法律法规。
- 若页面依赖 JavaScript 动态渲染,需改用渲染型方案(如 Playwright/Selenium);本脚本仅处理静态 HTML。

使用方法:
1) 安装依赖:pip install requests beautifulsoup4
2) 运行示例:python scrape_marketing_insights.py
   可选参数示例:python scrape_marketing_insights.py --url https://news.example.net/industry/marketing-insights --output-csv insights.csv --output-jsonl insights.jsonl --max-pages 3 --detail

Python脚本(保存为 scrape_marketing_insights.py):

```python
import argparse
import csv
import json
import logging
import random
import re
import sys
import time
from dataclasses import dataclass, asdict
from typing import List, Optional, Set
from urllib.parse import urljoin, urlparse
from urllib.robotparser import RobotFileParser

import requests
from bs4 import BeautifulSoup
from bs4.element import Tag

# -----------------------
# 配置与工具函数
# -----------------------

USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0 Safari/537.36"
DEFAULT_URL = "https://news.example.net/industry/marketing-insights"

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[logging.StreamHandler(sys.stdout)],
)

@dataclass
class Article:
    title: str
    url: str
    published_at: Optional[str]
    author: Optional[str]
    summary: Optional[str]
    source: str

def normalize_whitespace(text: Optional[str]) -> Optional[str]:
    if text is None:
        return None
    return re.sub(r"\s+", " ", text).strip()

def build_robot_parser(start_url: str) -> RobotFileParser:
    parsed = urlparse(start_url)
    robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt"
    rp = RobotFileParser()
    rp.set_url(robots_url)
    try:
        rp.read()
        logging.info(f"robots.txt 加载完成: {robots_url}")
    except Exception as e:
        logging.warning(f"robots.txt 加载失败: {robots_url} -> {e}")
    return rp

def can_fetch(rp: RobotFileParser, url: str, user_agent: str = USER_AGENT) -> bool:
    # 如果 robots.txt 无法读取,RobotFileParser 默认返回 True
    try:
        return rp.can_fetch(user_agent, url)
    except Exception:
        return True

def get_session() -> requests.Session:
    s = requests.Session()
    s.headers.update({
        "User-Agent": USER_AGENT,
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
    })
    return s

# -----------------------
# 解析函数(列表页与详情页)
# -----------------------

def find_article_containers(soup: BeautifulSoup) -> List[Tag]:
    # 通用选择器组合,尽可能兼容不同站点结构
    containers: List[Tag] = []
    containers.extend(soup.find_all("article"))
    containers.extend(soup.select('div[itemtype*="NewsArticle"], div[itemtype*="Article"]'))
    for cls in ["news", "article", "post", "story", "card", "entry"]:
        containers.extend(soup.select(f"div[class*='{cls}'], li[class*='{cls}']"))
    # 去重:以元素的字符串位置标识符(sourceline)+名称近似去重
    seen_ids = set()
    unique = []
    for tag in containers:
        key = (tag.name, getattr(tag, "sourceline", None), getattr(tag, "sourcepos", None))
        if key not in seen_ids:
            seen_ids.add(key)
            unique.append(tag)
    return unique

def extract_title_and_url(container: Tag, base_url: str) -> (Optional[str], Optional[str]):
    # 优先从标题标签提取
    title_tag = None
    for sel in ["h1 a", "h2 a", "h3 a", "h1", "h2", "h3", "a"]:
        title_tag = container.select_one(sel)
        if title_tag:
            break
    title = None
    url = None
    if title_tag:
        if title_tag.name == "a":
            title = title_tag.get_text(strip=True) or None
            href = title_tag.get("href")
            if href:
                url = urljoin(base_url, href)
        else:
            title = title_tag.get_text(strip=True) or None
            a = title_tag.find("a", href=True)
            if a and a.get("href"):
                url = urljoin(base_url, a["href"])
    # 回退:任何可点击链接
    if not url:
        a = container.find("a", href=True)
        if a and a.get_text(strip=True):
            url = urljoin(base_url, a["href"])
            title = title or a.get_text(strip=True)
    return normalize_whitespace(title), url

def extract_published_at(container: Tag) -> Optional[str]:
    # time 标签
    t = container.find("time")
    if t:
        dt = t.get("datetime") or t.get_text(strip=True)
        if dt:
            return normalize_whitespace(dt)
    # meta 标签(结构化)
    for sel in [
        'meta[itemprop="datePublished"]',
        'meta[property="article:published_time"]',
        'meta[name="pubdate"]',
        'span[itemprop="datePublished"]',
    ]:
        m = container.select_one(sel)
        if m:
            content = m.get("content") or m.get_text(strip=True)
            if content:
                return normalize_whitespace(content)
    return None

def extract_author(container: Tag) -> Optional[str]:
    for sel in [
        ".author", ".byline", '[itemprop="author"]', 'a[rel="author"]', 'meta[name="author"]',
    ]:
        el = container.select_one(sel)
        if el:
            content = el.get("content") if el.name == "meta" else el.get_text(strip=True)
            if content:
                return normalize_whitespace(content)
    return None

def extract_summary(container: Tag) -> Optional[str]:
    for sel in [".summary", ".dek", ".abstract", "p"]:
        el = container.select_one(sel)
        if el:
            text = el.get_text(" ", strip=True)
            if text and len(text.split()) >= 3:
                return normalize_whitespace(text)
    return None

def parse_detail_page(soup: BeautifulSoup) -> dict:
    # 从详情页提取更规范的字段(若存在)
    data = {"title": None, "published_at": None, "author": None, "summary": None}
    # 标题
    title_tag = soup.find(["h1", "title"])
    if title_tag:
        data["title"] = normalize_whitespace(title_tag.get_text(strip=True))
    og_title = soup.select_one('meta[property="og:title"]')
    if og_title and og_title.get("content"):
        data["title"] = normalize_whitespace(og_title["content"]) or data["title"]
    # 摘要
    og_desc = soup.select_one('meta[property="og:description"], meta[name="description"]')
    if og_desc and og_desc.get("content"):
        data["summary"] = normalize_whitespace(og_desc["content"])
    # 发布时间
    for sel in [
        'meta[property="article:published_time"]',
        'meta[itemprop="datePublished"]',
        'time[datetime]',
    ]:
        el = soup.select_one(sel)
        if el:
            content = el.get("content") or el.get("datetime") or el.get_text(strip=True)
            if content:
                data["published_at"] = normalize_whitespace(content)
                break
    # 作者
    for sel in [
        'meta[name="author"]',
        '[itemprop="author"]',
        '.author', '.byline', 'a[rel="author"]',
    ]:
        el = soup.select_one(sel)
        if el:
            content = el.get("content") if el.name == "meta" else el.get_text(strip=True)
            if content:
                data["author"] = normalize_whitespace(content)
                break
    # JSON-LD(可选增强)
    for script in soup.find_all("script", type="application/ld+json"):
        try:
            payload = json.loads(script.string or "")
        except Exception:
            continue
        objs = payload if isinstance(payload, list) else [payload]
        for obj in objs:
            t = obj.get("@type") or obj.get("@graph", [{}])[0].get("@type")
            if isinstance(t, list):
                t = ",".join(t)
            if t and ("NewsArticle" in t or "Article" in t):
                data["title"] = normalize_whitespace(obj.get("headline")) or data["title"]
                data["summary"] = normalize_whitespace(obj.get("description")) or data["summary"]
                pub = obj.get("datePublished") or obj.get("dateCreated")
                if pub:
                    data["published_at"] = normalize_whitespace(pub)
                author = obj.get("author")
                if isinstance(author, dict):
                    data["author"] = normalize_whitespace(author.get("name")) or data["author"]
                elif isinstance(author, list) and author:
                    if isinstance(author[0], dict):
                        data["author"] = normalize_whitespace(author[0].get("name")) or data["author"]
                    elif isinstance(author[0], str):
                        data["author"] = normalize_whitespace(author[0]) or data["author"]
                break
    return data

# -----------------------
# 抓取主流程
# -----------------------

def scrape_list(start_url: str,
                max_pages: int = 1,
                sleep_min: float = 1.0,
                sleep_max: float = 2.5,
                fetch_detail: bool = False) -> List[Article]:
    session = get_session()
    rp = build_robot_parser(start_url)
    articles: List[Article] = []
    seen_urls: Set[str] = set()

    def fetch(url: str) -> Optional[BeautifulSoup]:
        if not can_fetch(rp, url):
            logging.warning(f"robots 不允许抓取:{url}")
            return None
        try:
            resp = session.get(url, timeout=30)
            if resp.status_code != 200:
                logging.warning(f"请求失败 {resp.status_code}: {url}")
                return None
            return BeautifulSoup(resp.text, "html.parser")
        except requests.RequestException as e:
            logging.error(f"请求异常: {url} -> {e}")
            return None

    page_url = start_url
    base = f"{urlparse(start_url).scheme}://{urlparse(start_url).netloc}"

    for page_idx in range(max_pages):
        soup = fetch(page_url)
        if soup is None:
            break

        containers = find_article_containers(soup)
        if not containers:
            logging.info("未找到文章容器,尝试使用更宽松的链接抽取")
            # 宽松抽取:从主体区块抽取链接
            for a in soup.select("main a[href], .content a[href], .container a[href]"):
                title = normalize_whitespace(a.get_text(strip=True))
                if not title or len(title) < 5:
                    continue
                url = urljoin(base, a.get("href"))
                if url in seen_urls:
                    continue
                articles.append(Article(
                    title=title,
                    url=url,
                    published_at=None,
                    author=None,
                    summary=None,
                    source=start_url,
                ))
                seen_urls.add(url)
        else:
            for c in containers:
                title, url = extract_title_and_url(c, base)
                if not url or not title:
                    continue
                if url in seen_urls:
                    continue
                published_at = extract_published_at(c)
                author = extract_author(c)
                summary = extract_summary(c)

                # 详情页补全(可选)
                if fetch_detail and can_fetch(rp, url):
                    time.sleep(random.uniform(sleep_min, sleep_max))
                    dsoup = fetch(url)
                    if dsoup:
                        detail = parse_detail_page(dsoup)
                        title = detail["title"] or title
                        summary = detail["summary"] or summary
                        published_at = detail["published_at"] or published_at
                        author = detail["author"] or author

                articles.append(Article(
                    title=title,
                    url=url,
                    published_at=published_at,
                    author=author,
                    summary=summary,
                    source=start_url,
                ))
                seen_urls.add(url)

        # 分页:查找下一页链接
        next_link = soup.select_one('a[rel="next"], a.next, a[aria-label*="下一页"], a[aria-label*="Next"], li.next a')
        if next_link and next_link.get("href"):
            next_url = urljoin(base, next_link.get("href"))
            if next_url == page_url:
                break
            page_url = next_url
            logging.info(f"跳转下一页: {page_url}")
            time.sleep(random.uniform(sleep_min, sleep_max))
        else:
            break

    return articles

# -----------------------
# 输出与 CLI
# -----------------------

def write_csv(path: str, items: List[Article]) -> None:
    with open(path, "w", encoding="utf-8", newline="") as f:
        w = csv.DictWriter(f, fieldnames=["title", "url", "published_at", "author", "summary", "source"])
        w.writeheader()
        for it in items:
            w.writerow(asdict(it))

def write_jsonl(path: str, items: List[Article]) -> None:
    with open(path, "w", encoding="utf-8") as f:
        for it in items:
            f.write(json.dumps(asdict(it), ensure_ascii=False) + "\n")

def main():
    parser = argparse.ArgumentParser(description="Marketing Insights 数据抓取(BeautifulSoup)")
    parser.add_argument("--url", type=str, default=DEFAULT_URL, help="起始列表页 URL")
    parser.add_argument("--max-pages", type=int, default=1, help="最大分页页数")
    parser.add_argument("--sleep-min", type=float, default=1.0, help="最小请求间隔(秒)")
    parser.add_argument("--sleep-max", type=float, default=2.5, help="最大请求间隔(秒)")
    parser.add_argument("--detail", action="store_true", help="是否抓取详情页补全字段")
    parser.add_argument("--output-csv", type=str, default=None, help="CSV 输出路径")
    parser.add_argument("--output-jsonl", type=str, default=None, help="JSONL 输出路径")
    args = parser.parse_args()

    items = scrape_list(
        start_url=args.url,
        max_pages=args.max_pages,
        sleep_min=args.sleep_min,
        sleep_max=args.sleep_max,
        fetch_detail=args.detail,
    )

    logging.info(f"抓取完成,共 {len(items)} 条")
    if args.output_csv:
        write_csv(args.output_csv, items)
        logging.info(f"CSV 已写入: {args.output_csv}")
    if args.output_jsonl:
        write_jsonl(args.output_jsonl, items)
        logging.info(f"JSONL 已写入: {args.output_jsonl}")

    # 如果未指定输出文件,打印示例前几条
    if not args.output_csv and not args.output_jsonl:
        for it in items[:5]:
            print(json.dumps(asdict(it), ensure_ascii=False))

if __name__ == "__main__":
    main()
```

数据挖掘建议(采集到数据后的处理要点):
- 字段标准化:将 published_at 统一转换为 ISO8601;作者名进行规范化与实体消歧。
- 去重策略:依据 URL、标题+发布时间的组合键进行二次去重。
- 内容质量过滤:摘要长度阈值、关键词过滤(如“营销”“洞察”等)、语言检测,提升样本纯度。
- 结构化增强:若可用,优先采用页面中的结构化数据(JSON-LD)填补缺失字段。
- 合规与速率:遵守 robots 与站点条款;设置合理的间隔与并发限流,避免对站点造成压力。

适用用户

增长与营销运营

快速抓取竞品活动、价格与文案,生成可读数据,支持活动策略制定与投放优化。

数据分析师

批量采集来源数据,结合清洗与预处理建议,缩短数据准备周期,加快洞察产出。

内容编辑与新媒体

汇总行业资讯与素材链接,规范化整理输出,提升选题效率与内容迭代速度。

电商店主与小微创业者

监测平台热销榜与评论要点,提炼商品优化方向,助力上架策略与运营决策。

产品经理与竞品分析

持续抓取对手功能更新与公告页面,形成对比清单,支撑路线图规划与定位调整。

研究人员与教育从业者

采集公开数据与文献页面,生成结构化整理与说明,用于课题研究与教学案例。

客服与知识库运营

定期抓取FAQ与更新公告,结构化归档,提升知识库覆盖率与自助解决率。

招聘与人才市场分析

收集职位信息与技能要求,归纳趋势,为人才画像与培训规划提供依据。

解决的问题

将“给定网址+期望输出语言”一键转化为可直接运行的网页数据抓取脚本与清晰操作说明,面向增长、运营、产品、竞品研究与数据分析等高频场景,帮助用户在分钟级完成网页信息采集、基础清洗与结构化整理,显著降低技术门槛,提升交付确定性,为后续分析、可视化与报告撰写提供可靠的数据输入。

特征总结

一键生成可运行的网页数据抓取脚本,输入URL即得,快速开启采集。
自动识别页面结构与内容块,灵活提取标题、文本、链接等关键信息。
可按需求定制抓取范围与规则,避免冗余信息,聚焦业务必需数据。
生成清晰的步骤说明与使用指南,便于非技术同事快速上手。
内置数据清洗与预处理建议,采集后即可用于分析与报表。
支持多语言输出与注释,方便跨团队协作与对外交付。
提供异常与反爬提示方案,提升采集稳定性与任务完成率。
可模板化复用,批量替换URL与参数,轻松扩展到多站点。
与营销、竞品、内容运营场景紧密结合,直接驱动增长与决策。

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

¥20.00元
平台提供免费试用机制,
确保效果符合预期,再付费购买!

您购买后可以获得什么

获得完整提示词模板
- 共 250 tokens
- 2 个可调节参数
{ 输入URL } { 输出语言 }
自动加入"我的提示词库"
- 获得提示词优化器支持
- 版本化管理支持
获得社区共享的应用案例
限时免费

不要错过!

免费获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59