编写数据爬取脚本

179 浏览
14 试用
3 购买
Sep 27, 2025更新

利用Beautiful Soup编写Python脚本从指定URL提取数据。

以下脚本使用requests和BeautifulSoup从指定促销页抓取商品数据,并进行基础的预处理(文本清理、价格解析、字段标准化)、去重、结构化输出(CSV、JSON)。脚本包含对robots.txt的合规性检查、重试与限速、以及多策略解析(页面卡片、Microdata、JSON-LD),以提升在未知前端结构下的鲁棒性。可直接运行,默认保存到当前目录。

使用说明(简要):

  • 依赖:Python 3.8+,requests,beautifulsoup4
  • 安装:pip install requests beautifulsoup4
  • 运行:python scrape_promo.py
  • 输出:promo_summer_2025_products.csv,promo_summer_2025_products.json

脚本(scrape_promo.py): import requests from bs4 import BeautifulSoup import re import json import csv import logging import time from urllib.parse import urljoin, urlparse from urllib.robotparser import RobotFileParser from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry

PROMO_URL = "https://shop.testsite.cn/promo/summer-2025" USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0 Safari/537.36" REQUEST_TIMEOUT = 10 RATE_LIMIT_SECONDS = 1.0 # 基本限速,避免过快请求

logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s" )

def setup_session(): session = requests.Session() retries = Retry( total=3, backoff_factor=0.5, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=frozenset(["GET"]) ) adapter = HTTPAdapter(max_retries=retries) session.mount("http://", adapter) session.mount("https://", adapter) session.headers.update({ "User-Agent": USER_AGENT, "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,/;q=0.8" }) return session

def is_allowed_by_robots(url, user_agent=USER_AGENT, session=None): parsed = urlparse(url) robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt" rp = RobotFileParser() try: # 读取 robots.txt if session: resp = session.get(robots_url, timeout=REQUEST_TIMEOUT) if resp.status_code == 200: rp.parse(resp.text.splitlines()) else: # 无法获取robots,默认允许,但建议谨慎 logging.warning("无法获取 robots.txt,默认按允许处理:%s", robots_url) return True else: rp.set_url(robots_url) rp.read() return rp.can_fetch(user_agent, url) except Exception as e: logging.warning("检查 robots.txt 时出错:%s", e) return True

def fetch_html(url, session): time.sleep(RATE_LIMIT_SECONDS) resp = session.get(url, timeout=REQUEST_TIMEOUT) resp.raise_for_status() # 尝试合理编码 if not resp.encoding or resp.encoding.lower() == "iso-8859-1": resp.encoding = resp.apparent_encoding or "utf-8" return resp.text

def clean_text(text): if text is None: return None # 去除多余空白、不可见字符 return re.sub(r"\s+", " ", text).strip()

def parse_price(text): """ 解析价格字符串,返回 (value: float, currency: str or None) 支持格式示例:¥1,299.00、¥899、1299 元、CNY 499 """ if not text: return None, None t = clean_text(text) # 先匹配价格数值 m = re.search(r"(?P¥|¥|CNY|RMB|元)?\s*(?P\d{1,3}(?:[,\s]\d{3})*(?:.\d+)?|\d+(?:.\d+)?)", t) if not m: return None, None raw_val = m.group("value").replace(",", "").replace(" ", "") try: val = float(raw_val) except Exception: val = None cur = m.group("currency") # 归一化货币 if cur in ["¥", "¥", "元"]: cur = "CNY" return val, cur

def parse_discount(text): if not text: return None t = clean_text(text) m = re.search(r"(\d{1,3})\s*%", t) if m: pct = int(m.group(1)) if 0 < pct <= 100: return pct / 100.0 return None

def parse_rating(text): # 支持“4.5/5”、“评分 4.2”、“4.0 星”等 if not text: return None t = clean_text(text) m = re.search(r"(\d+(?:.\d+)?)\s*(?:/5|星|stars)", t, flags=re.IGNORECASE) return float(m.group(1)) if m else None

def extract_jsonld_products(soup, base_url): products = [] for tag in soup.find_all("script", type="application/ld+json"): try: data = json.loads(tag.string or tag.get_text()) except Exception: continue items = [] if isinstance(data, list): items = data elif isinstance(data, dict): # 如果是ItemList,展开itemListElement if data.get("@type") == "ItemList" and "itemListElement" in data: for el in data["itemListElement"]: # 列表项可能是ListItem,取其中的item obj = el.get("item") if isinstance(el, dict) else el if obj: items.append(obj) else: items = [data] for obj in items: if not isinstance(obj, dict): continue typ = obj.get("@type") if isinstance(typ, list): is_product = "Product" in typ else: is_product = typ == "Product" if not is_product: continue name = clean_text(obj.get("name")) url = obj.get("url") image = obj.get("image") description = clean_text(obj.get("description")) sku = obj.get("sku") or obj.get("mpn") brand = None if isinstance(obj.get("brand"), dict): brand = obj["brand"].get("name") offers = obj.get("offers") price = None currency = None original_price = None availability = None if isinstance(offers, list) and offers: off = offers[0] price, currency, availability = off.get("price"), off.get("priceCurrency"), off.get("availability") elif isinstance(offers, dict): price, currency, availability = offers.get("price"), offers.get("priceCurrency"), offers.get("availability") # 数值化价格 price_val = float(price) if isinstance(price, (int, float, str)) and re.match(r"^\d+(.\d+)?$", str(price)) else None # 标准化URL和图片 url = urljoin(base_url, url) if url else None image = urljoin(base_url, image) if isinstance(image, str) else image products.append({ "name": name, "product_url": url, "image_url": image, "sku": sku, "brand": brand, "price": price_val, "currency": currency, "original_price": original_price, "discount_rate": None, "stock_status": availability, "rating": None, "review_count": None, "description": description, "source": "jsonld" }) return products

def find_product_cards(soup): # 多策略选择可能的商品卡片 selectors = [ 'div[class*="product"]', 'li[class*="product"]', 'article[class*="product"]', 'div[class*="item"]', 'li[class*="item"]', 'div[class*="card"]', 'li[class*="card"]' ] cards = [] seen = set() for sel in selectors: for el in soup.select(sel): # 去重同一对象 oid = id(el) if oid not in seen: seen.add(oid) cards.append(el) return cards

def first_match_text(el, selectors_or_class_keywords): # selectors_or_class_keywords既可为CSS选择器列表,也可为类名关键词列表 if isinstance(selectors_or_class_keywords, (list, tuple)): # 先当做选择器尝试 for sel in selectors_or_class_keywords: found = el.select_one(sel) if found and clean_text(found.get_text()): return clean_text(found.get_text()) # 再当类关键词匹配 for kw in selectors_or_class_keywords: for tag in el.find_all(True, class_=lambda c: c and kw in " ".join(c) if isinstance(c, list) else (c and kw in c)): txt = clean_text(tag.get_text()) if txt: return txt return None

def get_attr_chain(el, attrs): for a in attrs: v = el.get(a) if v: return v return None

def parse_product_card(card, base_url): # 名称 name = None name_selectors = ['h1', 'h2', 'h3', 'a[class*="title"]', 'a[class*="name"]', 'div[class*="title"]', 'span[class*="title"]'] for sel in name_selectors: tag = card.select_one(sel) if tag: txt = clean_text(tag.get_text()) if txt: name = txt break if not name: # 尝试图片alt img = card.find("img") if img: alt = clean_text(img.get("alt")) if alt: name = alt

# 商品链接
product_url = None
a = card.find("a", href=True)
if a:
    href = a.get("href")
    if href and not href.startswith("#"):
        product_url = urljoin(base_url, href)

# 图片
image_url = None
img = card.find("img")
if img:
    image_url = get_attr_chain(img, ["data-src", "data-original", "src"])
    if image_url:
        image_url = urljoin(base_url, image_url)

# SKU
sku = get_attr_chain(card, ["data-sku", "data-id"])
if not sku:
    m = re.search(r"(SKU|货号)\s*[::]\s*([A-Za-z0-9\-_]+)", card.get_text())
    if m:
        sku = m.group(2)

# 价格相关
price = None
currency = None
original_price = None

# 优先从明显价格区块解析
price_blocks = card.find_all(True, class_=lambda c: c and (
    ("price" in " ".join(c)) or ("sale" in " ".join(c)) or ("final" in " ".join(c))
) if isinstance(c, list) else (c and ("price" in c or "sale" in c or "final" in c)))
if price_blocks:
    # 当前价
    for pb in price_blocks:
        val, cur = parse_price(pb.get_text())
        if val:
            price, currency = val, cur or currency
            break
    # 原价(可能在删除线或标注was/list)
    orig_blocks = card.find_all(["del", "s", "strike"]) + card.find_all(True, class_=lambda c: c and ("original" in c or "was" in c or "list" in c))
    for ob in orig_blocks:
        val, cur = parse_price(ob.get_text())
        if val:
            original_price = val
            if not currency and cur:
                currency = cur
            break

# 折扣
discount_rate = None
disc_blocks = card.find_all(True, class_=lambda c: c and ("discount" in c or "off" in c))
for db in disc_blocks:
    dr = parse_discount(db.get_text())
    if dr:
        discount_rate = dr
        break
# 若未解析到折扣但有原价和现价,计算折扣
if not discount_rate and original_price and price and original_price > 0 and price <= original_price:
    discount_rate = 1.0 - (price / original_price)

# 评分与评论数
rating = None
review_count = None
rating_blocks = card.find_all(True, class_=lambda c: c and ("rating" in c or "stars" in c))
for rb in rating_blocks:
    r = parse_rating(rb.get_text())
    if r:
        rating = r
        break
# 评论数
m = re.search(r"(\d+)\s*(条评|评论|reviews)", card.get_text(), flags=re.IGNORECASE)
if m:
    review_count = int(m.group(1))

# 库存状态
stock_status = None
text_all = card.get_text()
if re.search(r"(有货|现货|在库)", text_all):
    stock_status = "InStock"
elif re.search(r"(缺货|售罄|无货|OutOfStock)", text_all, flags=re.IGNORECASE):
    stock_status = "OutOfStock"

return {
    "name": name,
    "product_url": product_url,
    "image_url": image_url,
    "sku": sku,
    "brand": None,
    "price": price,
    "currency": currency,
    "original_price": original_price,
    "discount_rate": discount_rate,
    "stock_status": stock_status,
    "rating": rating,
    "review_count": review_count,
    "description": None,
    "source": "card"
}

def extract_microdata_products(soup, base_url): products = [] for prod in soup.find_all(attrs={"itemscope": True, "itemtype": re.compile("Product", re.IGNORECASE)}): def get_itemprop(prop): tag = prod.find(attrs={"itemprop": prop}) return clean_text(tag.get_text()) if tag and tag.get_text() else (tag.get("content") if tag else None) name = get_itemprop("name") url = get_itemprop("url") image = get_itemprop("image") sku = get_itemprop("sku") or get_itemprop("mpn") price = get_itemprop("price") currency = get_itemprop("priceCurrency") # 数值化价格 price_val = float(price) if price and re.match(r"^\d+(.\d+)?$", str(price)) else None products.append({ "name": name, "product_url": urljoin(base_url, url) if url else None, "image_url": urljoin(base_url, image) if image else None, "sku": sku, "brand": get_itemprop("brand"), "price": price_val, "currency": currency or "CNY", "original_price": None, "discount_rate": None, "stock_status": None, "rating": None, "review_count": None, "description": get_itemprop("description"), "source": "microdata" }) return products

def deduplicate(products): seen = set() out = [] for p in products: key = ( (p.get("sku") or "").strip().lower(), (p.get("product_url") or "").strip().lower() ) if key in seen: continue # 弱去重:若sku和url均为空,用name+price近似去重 if not key[0] and not key[1]: alt_key = ((p.get("name") or "").strip().lower(), str(p.get("price") or "")) if alt_key in seen: continue seen.add(alt_key) else: seen.add(key) out.append(p) return out

def normalize(products): """ 预处理:清理文本、标准化货币、过滤无名项。 """ out = [] for p in products: if not p.get("name"): continue p["name"] = clean_text(p["name"]) if p.get("currency") in ["¥", "¥", "元", None]: p["currency"] = "CNY" # 保证价格为float或None if isinstance(p.get("price"), str): val, cur = parse_price(p["price"]) p["price"] = val p["currency"] = p["currency"] or cur or "CNY" out.append(p) return out

def save_csv(products, path): fields = ["name", "product_url", "image_url", "sku", "brand", "price", "currency", "original_price", "discount_rate", "stock_status", "rating", "review_count", "description", "source"] with open(path, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=fields) writer.writeheader() for p in products: writer.writerow({k: p.get(k) for k in fields})

def save_json(products, path): with open(path, "w", encoding="utf-8") as f: json.dump(products, f, ensure_ascii=False, indent=2)

def main(): session = setup_session()

if not is_allowed_by_robots(PROMO_URL, USER_AGENT, session):
    logging.error("robots.txt不允许抓取该URL:%s", PROMO_URL)
    return

logging.info("开始抓取:%s", PROMO_URL)
html = fetch_html(PROMO_URL, session)
soup = BeautifulSoup(html, "html.parser")

# 多策略抽取
products = []
products += extract_jsonld_products(soup, PROMO_URL)
products += extract_microdata_products(soup, PROMO_URL)

# 卡片解析
cards = find_product_cards(soup)
logging.info("发现疑似商品卡片数量:%d", len(cards))
for card in cards:
    p = parse_product_card(card, PROMO_URL)
    # 至少需要名称或链接
    if p.get("name") or p.get("product_url"):
        products.append(p)

# 预处理与去重
products = normalize(products)
products = deduplicate(products)

logging.info("有效商品数量(去重后):%d", len(products))
if not products:
    logging.warning("未提取到任何商品,可能页面为动态渲染或选择器需调整。")
else:
    save_csv(products, "promo_summer_2025_products.csv")
    save_json(products, "promo_summer_2025_products.json")
    logging.info("已保存:promo_summer_2025_products.csv, promo_summer_2025_products.json")

if name == "main": main()

技术要点说明:

  • 解析策略优先使用结构化数据(JSON-LD、Microdata),其次回退到通用卡片选择器与正则提取,适用于未知或变动的页面结构。
  • 价格解析进行了格式容错与货币标准化(统一为CNY),便于后续建模,如价格分布、促销效果评估。
  • 去重优先使用SKU与URL,缺失时采用名称+价格近似去重,减少重复样本对分析的干扰。
  • 输出包含核心字段(名称、链接、图片、SKU、价格、折扣、库存、评分等),可直接导入数据分析管道。若需进一步分析,可在加载数据后进行数值填充、异常值处理、以及基于折扣率或库存状态的分群建模。

以下脚本使用 requests 与 Beautiful Soup 从指定栏目页抓取新闻数据,并包含基础的数据预处理(文本清洗、去重)、分页与速率控制、robots.txt 合规检查,以及可选的详情页补全。输出支持 CSV 与 JSONL,便于后续数据挖掘(特征工程、建模)的输入。

说明与假设:

  • 由于站点结构未知,脚本采用多组通用选择器与回退策略,尽可能抽取标题、链接、发布时间、作者与摘要。
  • 脚本会检查并遵守 robots.txt;请确保抓取行为符合目标站点的使用条款与法律法规。
  • 若页面依赖 JavaScript 动态渲染,需改用渲染型方案(如 Playwright/Selenium);本脚本仅处理静态 HTML。

使用方法:

  1. 安装依赖:pip install requests beautifulsoup4
  2. 运行示例:python scrape_marketing_insights.py 可选参数示例:python scrape_marketing_insights.py --url https://news.example.net/industry/marketing-insights --output-csv insights.csv --output-jsonl insights.jsonl --max-pages 3 --detail

Python脚本(保存为 scrape_marketing_insights.py):

import argparse
import csv
import json
import logging
import random
import re
import sys
import time
from dataclasses import dataclass, asdict
from typing import List, Optional, Set
from urllib.parse import urljoin, urlparse
from urllib.robotparser import RobotFileParser

import requests
from bs4 import BeautifulSoup
from bs4.element import Tag

# -----------------------
# 配置与工具函数
# -----------------------

USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0 Safari/537.36"
DEFAULT_URL = "https://news.example.net/industry/marketing-insights"

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[logging.StreamHandler(sys.stdout)],
)

@dataclass
class Article:
    title: str
    url: str
    published_at: Optional[str]
    author: Optional[str]
    summary: Optional[str]
    source: str

def normalize_whitespace(text: Optional[str]) -> Optional[str]:
    if text is None:
        return None
    return re.sub(r"\s+", " ", text).strip()

def build_robot_parser(start_url: str) -> RobotFileParser:
    parsed = urlparse(start_url)
    robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt"
    rp = RobotFileParser()
    rp.set_url(robots_url)
    try:
        rp.read()
        logging.info(f"robots.txt 加载完成: {robots_url}")
    except Exception as e:
        logging.warning(f"robots.txt 加载失败: {robots_url} -> {e}")
    return rp

def can_fetch(rp: RobotFileParser, url: str, user_agent: str = USER_AGENT) -> bool:
    # 如果 robots.txt 无法读取,RobotFileParser 默认返回 True
    try:
        return rp.can_fetch(user_agent, url)
    except Exception:
        return True

def get_session() -> requests.Session:
    s = requests.Session()
    s.headers.update({
        "User-Agent": USER_AGENT,
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
    })
    return s

# -----------------------
# 解析函数(列表页与详情页)
# -----------------------

def find_article_containers(soup: BeautifulSoup) -> List[Tag]:
    # 通用选择器组合,尽可能兼容不同站点结构
    containers: List[Tag] = []
    containers.extend(soup.find_all("article"))
    containers.extend(soup.select('div[itemtype*="NewsArticle"], div[itemtype*="Article"]'))
    for cls in ["news", "article", "post", "story", "card", "entry"]:
        containers.extend(soup.select(f"div[class*='{cls}'], li[class*='{cls}']"))
    # 去重:以元素的字符串位置标识符(sourceline)+名称近似去重
    seen_ids = set()
    unique = []
    for tag in containers:
        key = (tag.name, getattr(tag, "sourceline", None), getattr(tag, "sourcepos", None))
        if key not in seen_ids:
            seen_ids.add(key)
            unique.append(tag)
    return unique

def extract_title_and_url(container: Tag, base_url: str) -> (Optional[str], Optional[str]):
    # 优先从标题标签提取
    title_tag = None
    for sel in ["h1 a", "h2 a", "h3 a", "h1", "h2", "h3", "a"]:
        title_tag = container.select_one(sel)
        if title_tag:
            break
    title = None
    url = None
    if title_tag:
        if title_tag.name == "a":
            title = title_tag.get_text(strip=True) or None
            href = title_tag.get("href")
            if href:
                url = urljoin(base_url, href)
        else:
            title = title_tag.get_text(strip=True) or None
            a = title_tag.find("a", href=True)
            if a and a.get("href"):
                url = urljoin(base_url, a["href"])
    # 回退:任何可点击链接
    if not url:
        a = container.find("a", href=True)
        if a and a.get_text(strip=True):
            url = urljoin(base_url, a["href"])
            title = title or a.get_text(strip=True)
    return normalize_whitespace(title), url

def extract_published_at(container: Tag) -> Optional[str]:
    # time 标签
    t = container.find("time")
    if t:
        dt = t.get("datetime") or t.get_text(strip=True)
        if dt:
            return normalize_whitespace(dt)
    # meta 标签(结构化)
    for sel in [
        'meta[itemprop="datePublished"]',
        'meta[property="article:published_time"]',
        'meta[name="pubdate"]',
        'span[itemprop="datePublished"]',
    ]:
        m = container.select_one(sel)
        if m:
            content = m.get("content") or m.get_text(strip=True)
            if content:
                return normalize_whitespace(content)
    return None

def extract_author(container: Tag) -> Optional[str]:
    for sel in [
        ".author", ".byline", '[itemprop="author"]', 'a[rel="author"]', 'meta[name="author"]',
    ]:
        el = container.select_one(sel)
        if el:
            content = el.get("content") if el.name == "meta" else el.get_text(strip=True)
            if content:
                return normalize_whitespace(content)
    return None

def extract_summary(container: Tag) -> Optional[str]:
    for sel in [".summary", ".dek", ".abstract", "p"]:
        el = container.select_one(sel)
        if el:
            text = el.get_text(" ", strip=True)
            if text and len(text.split()) >= 3:
                return normalize_whitespace(text)
    return None

def parse_detail_page(soup: BeautifulSoup) -> dict:
    # 从详情页提取更规范的字段(若存在)
    data = {"title": None, "published_at": None, "author": None, "summary": None}
    # 标题
    title_tag = soup.find(["h1", "title"])
    if title_tag:
        data["title"] = normalize_whitespace(title_tag.get_text(strip=True))
    og_title = soup.select_one('meta[property="og:title"]')
    if og_title and og_title.get("content"):
        data["title"] = normalize_whitespace(og_title["content"]) or data["title"]
    # 摘要
    og_desc = soup.select_one('meta[property="og:description"], meta[name="description"]')
    if og_desc and og_desc.get("content"):
        data["summary"] = normalize_whitespace(og_desc["content"])
    # 发布时间
    for sel in [
        'meta[property="article:published_time"]',
        'meta[itemprop="datePublished"]',
        'time[datetime]',
    ]:
        el = soup.select_one(sel)
        if el:
            content = el.get("content") or el.get("datetime") or el.get_text(strip=True)
            if content:
                data["published_at"] = normalize_whitespace(content)
                break
    # 作者
    for sel in [
        'meta[name="author"]',
        '[itemprop="author"]',
        '.author', '.byline', 'a[rel="author"]',
    ]:
        el = soup.select_one(sel)
        if el:
            content = el.get("content") if el.name == "meta" else el.get_text(strip=True)
            if content:
                data["author"] = normalize_whitespace(content)
                break
    # JSON-LD(可选增强)
    for script in soup.find_all("script", type="application/ld+json"):
        try:
            payload = json.loads(script.string or "")
        except Exception:
            continue
        objs = payload if isinstance(payload, list) else [payload]
        for obj in objs:
            t = obj.get("@type") or obj.get("@graph", [{}])[0].get("@type")
            if isinstance(t, list):
                t = ",".join(t)
            if t and ("NewsArticle" in t or "Article" in t):
                data["title"] = normalize_whitespace(obj.get("headline")) or data["title"]
                data["summary"] = normalize_whitespace(obj.get("description")) or data["summary"]
                pub = obj.get("datePublished") or obj.get("dateCreated")
                if pub:
                    data["published_at"] = normalize_whitespace(pub)
                author = obj.get("author")
                if isinstance(author, dict):
                    data["author"] = normalize_whitespace(author.get("name")) or data["author"]
                elif isinstance(author, list) and author:
                    if isinstance(author[0], dict):
                        data["author"] = normalize_whitespace(author[0].get("name")) or data["author"]
                    elif isinstance(author[0], str):
                        data["author"] = normalize_whitespace(author[0]) or data["author"]
                break
    return data

# -----------------------
# 抓取主流程
# -----------------------

def scrape_list(start_url: str,
                max_pages: int = 1,
                sleep_min: float = 1.0,
                sleep_max: float = 2.5,
                fetch_detail: bool = False) -> List[Article]:
    session = get_session()
    rp = build_robot_parser(start_url)
    articles: List[Article] = []
    seen_urls: Set[str] = set()

    def fetch(url: str) -> Optional[BeautifulSoup]:
        if not can_fetch(rp, url):
            logging.warning(f"robots 不允许抓取:{url}")
            return None
        try:
            resp = session.get(url, timeout=30)
            if resp.status_code != 200:
                logging.warning(f"请求失败 {resp.status_code}: {url}")
                return None
            return BeautifulSoup(resp.text, "html.parser")
        except requests.RequestException as e:
            logging.error(f"请求异常: {url} -> {e}")
            return None

    page_url = start_url
    base = f"{urlparse(start_url).scheme}://{urlparse(start_url).netloc}"

    for page_idx in range(max_pages):
        soup = fetch(page_url)
        if soup is None:
            break

        containers = find_article_containers(soup)
        if not containers:
            logging.info("未找到文章容器,尝试使用更宽松的链接抽取")
            # 宽松抽取:从主体区块抽取链接
            for a in soup.select("main a[href], .content a[href], .container a[href]"):
                title = normalize_whitespace(a.get_text(strip=True))
                if not title or len(title) < 5:
                    continue
                url = urljoin(base, a.get("href"))
                if url in seen_urls:
                    continue
                articles.append(Article(
                    title=title,
                    url=url,
                    published_at=None,
                    author=None,
                    summary=None,
                    source=start_url,
                ))
                seen_urls.add(url)
        else:
            for c in containers:
                title, url = extract_title_and_url(c, base)
                if not url or not title:
                    continue
                if url in seen_urls:
                    continue
                published_at = extract_published_at(c)
                author = extract_author(c)
                summary = extract_summary(c)

                # 详情页补全(可选)
                if fetch_detail and can_fetch(rp, url):
                    time.sleep(random.uniform(sleep_min, sleep_max))
                    dsoup = fetch(url)
                    if dsoup:
                        detail = parse_detail_page(dsoup)
                        title = detail["title"] or title
                        summary = detail["summary"] or summary
                        published_at = detail["published_at"] or published_at
                        author = detail["author"] or author

                articles.append(Article(
                    title=title,
                    url=url,
                    published_at=published_at,
                    author=author,
                    summary=summary,
                    source=start_url,
                ))
                seen_urls.add(url)

        # 分页:查找下一页链接
        next_link = soup.select_one('a[rel="next"], a.next, a[aria-label*="下一页"], a[aria-label*="Next"], li.next a')
        if next_link and next_link.get("href"):
            next_url = urljoin(base, next_link.get("href"))
            if next_url == page_url:
                break
            page_url = next_url
            logging.info(f"跳转下一页: {page_url}")
            time.sleep(random.uniform(sleep_min, sleep_max))
        else:
            break

    return articles

# -----------------------
# 输出与 CLI
# -----------------------

def write_csv(path: str, items: List[Article]) -> None:
    with open(path, "w", encoding="utf-8", newline="") as f:
        w = csv.DictWriter(f, fieldnames=["title", "url", "published_at", "author", "summary", "source"])
        w.writeheader()
        for it in items:
            w.writerow(asdict(it))

def write_jsonl(path: str, items: List[Article]) -> None:
    with open(path, "w", encoding="utf-8") as f:
        for it in items:
            f.write(json.dumps(asdict(it), ensure_ascii=False) + "\n")

def main():
    parser = argparse.ArgumentParser(description="Marketing Insights 数据抓取(BeautifulSoup)")
    parser.add_argument("--url", type=str, default=DEFAULT_URL, help="起始列表页 URL")
    parser.add_argument("--max-pages", type=int, default=1, help="最大分页页数")
    parser.add_argument("--sleep-min", type=float, default=1.0, help="最小请求间隔(秒)")
    parser.add_argument("--sleep-max", type=float, default=2.5, help="最大请求间隔(秒)")
    parser.add_argument("--detail", action="store_true", help="是否抓取详情页补全字段")
    parser.add_argument("--output-csv", type=str, default=None, help="CSV 输出路径")
    parser.add_argument("--output-jsonl", type=str, default=None, help="JSONL 输出路径")
    args = parser.parse_args()

    items = scrape_list(
        start_url=args.url,
        max_pages=args.max_pages,
        sleep_min=args.sleep_min,
        sleep_max=args.sleep_max,
        fetch_detail=args.detail,
    )

    logging.info(f"抓取完成,共 {len(items)} 条")
    if args.output_csv:
        write_csv(args.output_csv, items)
        logging.info(f"CSV 已写入: {args.output_csv}")
    if args.output_jsonl:
        write_jsonl(args.output_jsonl, items)
        logging.info(f"JSONL 已写入: {args.output_jsonl}")

    # 如果未指定输出文件,打印示例前几条
    if not args.output_csv and not args.output_jsonl:
        for it in items[:5]:
            print(json.dumps(asdict(it), ensure_ascii=False))

if __name__ == "__main__":
    main()

数据挖掘建议(采集到数据后的处理要点):

  • 字段标准化:将 published_at 统一转换为 ISO8601;作者名进行规范化与实体消歧。
  • 去重策略:依据 URL、标题+发布时间的组合键进行二次去重。
  • 内容质量过滤:摘要长度阈值、关键词过滤(如“营销”“洞察”等)、语言检测,提升样本纯度。
  • 结构化增强:若可用,优先采用页面中的结构化数据(JSON-LD)填补缺失字段。
  • 合规与速率:遵守 robots 与站点条款;设置合理的间隔与并发限流,避免对站点造成压力。

示例详情

解决的问题

将“给定网址+期望输出语言”一键转化为可直接运行的网页数据抓取脚本与清晰操作说明,面向增长、运营、产品、竞品研究与数据分析等高频场景,帮助用户在分钟级完成网页信息采集、基础清洗与结构化整理,显著降低技术门槛,提升交付确定性,为后续分析、可视化与报告撰写提供可靠的数据输入。

适用用户

增长与营销运营

快速抓取竞品活动、价格与文案,生成可读数据,支持活动策略制定与投放优化。

数据分析师

批量采集来源数据,结合清洗与预处理建议,缩短数据准备周期,加快洞察产出。

内容编辑与新媒体

汇总行业资讯与素材链接,规范化整理输出,提升选题效率与内容迭代速度。

特征总结

一键生成可运行的网页数据抓取脚本,输入URL即得,快速开启采集。
自动识别页面结构与内容块,灵活提取标题、文本、链接等关键信息。
可按需求定制抓取范围与规则,避免冗余信息,聚焦业务必需数据。
生成清晰的步骤说明与使用指南,便于非技术同事快速上手。
内置数据清洗与预处理建议,采集后即可用于分析与报表。
支持多语言输出与注释,方便跨团队协作与对外交付。
提供异常与反爬提示方案,提升采集稳定性与任务完成率。
可模板化复用,批量替换URL与参数,轻松扩展到多站点。
与营销、竞品、内容运营场景紧密结合,直接驱动增长与决策。

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

AI 提示词价格
¥20.00元
先用后买,用好了再付款,超安全!

您购买后可以获得什么

获得完整提示词模板
- 共 250 tokens
- 2 个可调节参数
{ 输入URL } { 输出语言 }
获得社区贡献内容的使用权
- 精选社区优质案例,助您快速上手提示词
限时免费

不要错过!

免费获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59