利用Beautiful Soup编写Python脚本从指定URL提取数据。
以下脚本使用requests和BeautifulSoup从指定促销页抓取商品数据,并进行基础的预处理(文本清理、价格解析、字段标准化)、去重、结构化输出(CSV、JSON)。脚本包含对robots.txt的合规性检查、重试与限速、以及多策略解析(页面卡片、Microdata、JSON-LD),以提升在未知前端结构下的鲁棒性。可直接运行,默认保存到当前目录。 使用说明(简要): - 依赖:Python 3.8+,requests,beautifulsoup4 - 安装:pip install requests beautifulsoup4 - 运行:python scrape_promo.py - 输出:promo_summer_2025_products.csv,promo_summer_2025_products.json 脚本(scrape_promo.py): import requests from bs4 import BeautifulSoup import re import json import csv import logging import time from urllib.parse import urljoin, urlparse from urllib.robotparser import RobotFileParser from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry PROMO_URL = "https://shop.testsite.cn/promo/summer-2025" USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0 Safari/537.36" REQUEST_TIMEOUT = 10 RATE_LIMIT_SECONDS = 1.0 # 基本限速,避免过快请求 logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s" ) def setup_session(): session = requests.Session() retries = Retry( total=3, backoff_factor=0.5, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=frozenset(["GET"]) ) adapter = HTTPAdapter(max_retries=retries) session.mount("http://", adapter) session.mount("https://", adapter) session.headers.update({ "User-Agent": USER_AGENT, "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8" }) return session def is_allowed_by_robots(url, user_agent=USER_AGENT, session=None): parsed = urlparse(url) robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt" rp = RobotFileParser() try: # 读取 robots.txt if session: resp = session.get(robots_url, timeout=REQUEST_TIMEOUT) if resp.status_code == 200: rp.parse(resp.text.splitlines()) else: # 无法获取robots,默认允许,但建议谨慎 logging.warning("无法获取 robots.txt,默认按允许处理:%s", robots_url) return True else: rp.set_url(robots_url) rp.read() return rp.can_fetch(user_agent, url) except Exception as e: logging.warning("检查 robots.txt 时出错:%s", e) return True def fetch_html(url, session): time.sleep(RATE_LIMIT_SECONDS) resp = session.get(url, timeout=REQUEST_TIMEOUT) resp.raise_for_status() # 尝试合理编码 if not resp.encoding or resp.encoding.lower() == "iso-8859-1": resp.encoding = resp.apparent_encoding or "utf-8" return resp.text def clean_text(text): if text is None: return None # 去除多余空白、不可见字符 return re.sub(r"\s+", " ", text).strip() def parse_price(text): """ 解析价格字符串,返回 (value: float, currency: str or None) 支持格式示例:¥1,299.00、¥899、1299 元、CNY 499 """ if not text: return None, None t = clean_text(text) # 先匹配价格数值 m = re.search(r"(?P<currency>¥|¥|CNY|RMB|元)?\s*(?P<value>\d{1,3}(?:[,\s]\d{3})*(?:\.\d+)?|\d+(?:\.\d+)?)", t) if not m: return None, None raw_val = m.group("value").replace(",", "").replace(" ", "") try: val = float(raw_val) except Exception: val = None cur = m.group("currency") # 归一化货币 if cur in ["¥", "¥", "元"]: cur = "CNY" return val, cur def parse_discount(text): if not text: return None t = clean_text(text) m = re.search(r"(\d{1,3})\s*%", t) if m: pct = int(m.group(1)) if 0 < pct <= 100: return pct / 100.0 return None def parse_rating(text): # 支持“4.5/5”、“评分 4.2”、“4.0 星”等 if not text: return None t = clean_text(text) m = re.search(r"(\d+(?:\.\d+)?)\s*(?:/5|星|stars)", t, flags=re.IGNORECASE) return float(m.group(1)) if m else None def extract_jsonld_products(soup, base_url): products = [] for tag in soup.find_all("script", type="application/ld+json"): try: data = json.loads(tag.string or tag.get_text()) except Exception: continue items = [] if isinstance(data, list): items = data elif isinstance(data, dict): # 如果是ItemList,展开itemListElement if data.get("@type") == "ItemList" and "itemListElement" in data: for el in data["itemListElement"]: # 列表项可能是ListItem,取其中的item obj = el.get("item") if isinstance(el, dict) else el if obj: items.append(obj) else: items = [data] for obj in items: if not isinstance(obj, dict): continue typ = obj.get("@type") if isinstance(typ, list): is_product = "Product" in typ else: is_product = typ == "Product" if not is_product: continue name = clean_text(obj.get("name")) url = obj.get("url") image = obj.get("image") description = clean_text(obj.get("description")) sku = obj.get("sku") or obj.get("mpn") brand = None if isinstance(obj.get("brand"), dict): brand = obj["brand"].get("name") offers = obj.get("offers") price = None currency = None original_price = None availability = None if isinstance(offers, list) and offers: off = offers[0] price, currency, availability = off.get("price"), off.get("priceCurrency"), off.get("availability") elif isinstance(offers, dict): price, currency, availability = offers.get("price"), offers.get("priceCurrency"), offers.get("availability") # 数值化价格 price_val = float(price) if isinstance(price, (int, float, str)) and re.match(r"^\d+(\.\d+)?$", str(price)) else None # 标准化URL和图片 url = urljoin(base_url, url) if url else None image = urljoin(base_url, image) if isinstance(image, str) else image products.append({ "name": name, "product_url": url, "image_url": image, "sku": sku, "brand": brand, "price": price_val, "currency": currency, "original_price": original_price, "discount_rate": None, "stock_status": availability, "rating": None, "review_count": None, "description": description, "source": "jsonld" }) return products def find_product_cards(soup): # 多策略选择可能的商品卡片 selectors = [ 'div[class*="product"]', 'li[class*="product"]', 'article[class*="product"]', 'div[class*="item"]', 'li[class*="item"]', 'div[class*="card"]', 'li[class*="card"]' ] cards = [] seen = set() for sel in selectors: for el in soup.select(sel): # 去重同一对象 oid = id(el) if oid not in seen: seen.add(oid) cards.append(el) return cards def first_match_text(el, selectors_or_class_keywords): # selectors_or_class_keywords既可为CSS选择器列表,也可为类名关键词列表 if isinstance(selectors_or_class_keywords, (list, tuple)): # 先当做选择器尝试 for sel in selectors_or_class_keywords: found = el.select_one(sel) if found and clean_text(found.get_text()): return clean_text(found.get_text()) # 再当类关键词匹配 for kw in selectors_or_class_keywords: for tag in el.find_all(True, class_=lambda c: c and kw in " ".join(c) if isinstance(c, list) else (c and kw in c)): txt = clean_text(tag.get_text()) if txt: return txt return None def get_attr_chain(el, attrs): for a in attrs: v = el.get(a) if v: return v return None def parse_product_card(card, base_url): # 名称 name = None name_selectors = ['h1', 'h2', 'h3', 'a[class*="title"]', 'a[class*="name"]', 'div[class*="title"]', 'span[class*="title"]'] for sel in name_selectors: tag = card.select_one(sel) if tag: txt = clean_text(tag.get_text()) if txt: name = txt break if not name: # 尝试图片alt img = card.find("img") if img: alt = clean_text(img.get("alt")) if alt: name = alt # 商品链接 product_url = None a = card.find("a", href=True) if a: href = a.get("href") if href and not href.startswith("#"): product_url = urljoin(base_url, href) # 图片 image_url = None img = card.find("img") if img: image_url = get_attr_chain(img, ["data-src", "data-original", "src"]) if image_url: image_url = urljoin(base_url, image_url) # SKU sku = get_attr_chain(card, ["data-sku", "data-id"]) if not sku: m = re.search(r"(SKU|货号)\s*[::]\s*([A-Za-z0-9\-_]+)", card.get_text()) if m: sku = m.group(2) # 价格相关 price = None currency = None original_price = None # 优先从明显价格区块解析 price_blocks = card.find_all(True, class_=lambda c: c and ( ("price" in " ".join(c)) or ("sale" in " ".join(c)) or ("final" in " ".join(c)) ) if isinstance(c, list) else (c and ("price" in c or "sale" in c or "final" in c))) if price_blocks: # 当前价 for pb in price_blocks: val, cur = parse_price(pb.get_text()) if val: price, currency = val, cur or currency break # 原价(可能在删除线或标注was/list) orig_blocks = card.find_all(["del", "s", "strike"]) + card.find_all(True, class_=lambda c: c and ("original" in c or "was" in c or "list" in c)) for ob in orig_blocks: val, cur = parse_price(ob.get_text()) if val: original_price = val if not currency and cur: currency = cur break # 折扣 discount_rate = None disc_blocks = card.find_all(True, class_=lambda c: c and ("discount" in c or "off" in c)) for db in disc_blocks: dr = parse_discount(db.get_text()) if dr: discount_rate = dr break # 若未解析到折扣但有原价和现价,计算折扣 if not discount_rate and original_price and price and original_price > 0 and price <= original_price: discount_rate = 1.0 - (price / original_price) # 评分与评论数 rating = None review_count = None rating_blocks = card.find_all(True, class_=lambda c: c and ("rating" in c or "stars" in c)) for rb in rating_blocks: r = parse_rating(rb.get_text()) if r: rating = r break # 评论数 m = re.search(r"(\d+)\s*(条评|评论|reviews)", card.get_text(), flags=re.IGNORECASE) if m: review_count = int(m.group(1)) # 库存状态 stock_status = None text_all = card.get_text() if re.search(r"(有货|现货|在库)", text_all): stock_status = "InStock" elif re.search(r"(缺货|售罄|无货|OutOfStock)", text_all, flags=re.IGNORECASE): stock_status = "OutOfStock" return { "name": name, "product_url": product_url, "image_url": image_url, "sku": sku, "brand": None, "price": price, "currency": currency, "original_price": original_price, "discount_rate": discount_rate, "stock_status": stock_status, "rating": rating, "review_count": review_count, "description": None, "source": "card" } def extract_microdata_products(soup, base_url): products = [] for prod in soup.find_all(attrs={"itemscope": True, "itemtype": re.compile("Product", re.IGNORECASE)}): def get_itemprop(prop): tag = prod.find(attrs={"itemprop": prop}) return clean_text(tag.get_text()) if tag and tag.get_text() else (tag.get("content") if tag else None) name = get_itemprop("name") url = get_itemprop("url") image = get_itemprop("image") sku = get_itemprop("sku") or get_itemprop("mpn") price = get_itemprop("price") currency = get_itemprop("priceCurrency") # 数值化价格 price_val = float(price) if price and re.match(r"^\d+(\.\d+)?$", str(price)) else None products.append({ "name": name, "product_url": urljoin(base_url, url) if url else None, "image_url": urljoin(base_url, image) if image else None, "sku": sku, "brand": get_itemprop("brand"), "price": price_val, "currency": currency or "CNY", "original_price": None, "discount_rate": None, "stock_status": None, "rating": None, "review_count": None, "description": get_itemprop("description"), "source": "microdata" }) return products def deduplicate(products): seen = set() out = [] for p in products: key = ( (p.get("sku") or "").strip().lower(), (p.get("product_url") or "").strip().lower() ) if key in seen: continue # 弱去重:若sku和url均为空,用name+price近似去重 if not key[0] and not key[1]: alt_key = ((p.get("name") or "").strip().lower(), str(p.get("price") or "")) if alt_key in seen: continue seen.add(alt_key) else: seen.add(key) out.append(p) return out def normalize(products): """ 预处理:清理文本、标准化货币、过滤无名项。 """ out = [] for p in products: if not p.get("name"): continue p["name"] = clean_text(p["name"]) if p.get("currency") in ["¥", "¥", "元", None]: p["currency"] = "CNY" # 保证价格为float或None if isinstance(p.get("price"), str): val, cur = parse_price(p["price"]) p["price"] = val p["currency"] = p["currency"] or cur or "CNY" out.append(p) return out def save_csv(products, path): fields = ["name", "product_url", "image_url", "sku", "brand", "price", "currency", "original_price", "discount_rate", "stock_status", "rating", "review_count", "description", "source"] with open(path, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=fields) writer.writeheader() for p in products: writer.writerow({k: p.get(k) for k in fields}) def save_json(products, path): with open(path, "w", encoding="utf-8") as f: json.dump(products, f, ensure_ascii=False, indent=2) def main(): session = setup_session() if not is_allowed_by_robots(PROMO_URL, USER_AGENT, session): logging.error("robots.txt不允许抓取该URL:%s", PROMO_URL) return logging.info("开始抓取:%s", PROMO_URL) html = fetch_html(PROMO_URL, session) soup = BeautifulSoup(html, "html.parser") # 多策略抽取 products = [] products += extract_jsonld_products(soup, PROMO_URL) products += extract_microdata_products(soup, PROMO_URL) # 卡片解析 cards = find_product_cards(soup) logging.info("发现疑似商品卡片数量:%d", len(cards)) for card in cards: p = parse_product_card(card, PROMO_URL) # 至少需要名称或链接 if p.get("name") or p.get("product_url"): products.append(p) # 预处理与去重 products = normalize(products) products = deduplicate(products) logging.info("有效商品数量(去重后):%d", len(products)) if not products: logging.warning("未提取到任何商品,可能页面为动态渲染或选择器需调整。") else: save_csv(products, "promo_summer_2025_products.csv") save_json(products, "promo_summer_2025_products.json") logging.info("已保存:promo_summer_2025_products.csv, promo_summer_2025_products.json") if __name__ == "__main__": main() 技术要点说明: - 解析策略优先使用结构化数据(JSON-LD、Microdata),其次回退到通用卡片选择器与正则提取,适用于未知或变动的页面结构。 - 价格解析进行了格式容错与货币标准化(统一为CNY),便于后续建模,如价格分布、促销效果评估。 - 去重优先使用SKU与URL,缺失时采用名称+价格近似去重,减少重复样本对分析的干扰。 - 输出包含核心字段(名称、链接、图片、SKU、价格、折扣、库存、评分等),可直接导入数据分析管道。若需进一步分析,可在加载数据后进行数值填充、异常值处理、以及基于折扣率或库存状态的分群建模。
以下脚本使用 requests 与 Beautiful Soup 从指定栏目页抓取新闻数据,并包含基础的数据预处理(文本清洗、去重)、分页与速率控制、robots.txt 合规检查,以及可选的详情页补全。输出支持 CSV 与 JSONL,便于后续数据挖掘(特征工程、建模)的输入。 说明与假设: - 由于站点结构未知,脚本采用多组通用选择器与回退策略,尽可能抽取标题、链接、发布时间、作者与摘要。 - 脚本会检查并遵守 robots.txt;请确保抓取行为符合目标站点的使用条款与法律法规。 - 若页面依赖 JavaScript 动态渲染,需改用渲染型方案(如 Playwright/Selenium);本脚本仅处理静态 HTML。 使用方法: 1) 安装依赖:pip install requests beautifulsoup4 2) 运行示例:python scrape_marketing_insights.py 可选参数示例:python scrape_marketing_insights.py --url https://news.example.net/industry/marketing-insights --output-csv insights.csv --output-jsonl insights.jsonl --max-pages 3 --detail Python脚本(保存为 scrape_marketing_insights.py): ```python import argparse import csv import json import logging import random import re import sys import time from dataclasses import dataclass, asdict from typing import List, Optional, Set from urllib.parse import urljoin, urlparse from urllib.robotparser import RobotFileParser import requests from bs4 import BeautifulSoup from bs4.element import Tag # ----------------------- # 配置与工具函数 # ----------------------- USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0 Safari/537.36" DEFAULT_URL = "https://news.example.net/industry/marketing-insights" logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[logging.StreamHandler(sys.stdout)], ) @dataclass class Article: title: str url: str published_at: Optional[str] author: Optional[str] summary: Optional[str] source: str def normalize_whitespace(text: Optional[str]) -> Optional[str]: if text is None: return None return re.sub(r"\s+", " ", text).strip() def build_robot_parser(start_url: str) -> RobotFileParser: parsed = urlparse(start_url) robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt" rp = RobotFileParser() rp.set_url(robots_url) try: rp.read() logging.info(f"robots.txt 加载完成: {robots_url}") except Exception as e: logging.warning(f"robots.txt 加载失败: {robots_url} -> {e}") return rp def can_fetch(rp: RobotFileParser, url: str, user_agent: str = USER_AGENT) -> bool: # 如果 robots.txt 无法读取,RobotFileParser 默认返回 True try: return rp.can_fetch(user_agent, url) except Exception: return True def get_session() -> requests.Session: s = requests.Session() s.headers.update({ "User-Agent": USER_AGENT, "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", }) return s # ----------------------- # 解析函数(列表页与详情页) # ----------------------- def find_article_containers(soup: BeautifulSoup) -> List[Tag]: # 通用选择器组合,尽可能兼容不同站点结构 containers: List[Tag] = [] containers.extend(soup.find_all("article")) containers.extend(soup.select('div[itemtype*="NewsArticle"], div[itemtype*="Article"]')) for cls in ["news", "article", "post", "story", "card", "entry"]: containers.extend(soup.select(f"div[class*='{cls}'], li[class*='{cls}']")) # 去重:以元素的字符串位置标识符(sourceline)+名称近似去重 seen_ids = set() unique = [] for tag in containers: key = (tag.name, getattr(tag, "sourceline", None), getattr(tag, "sourcepos", None)) if key not in seen_ids: seen_ids.add(key) unique.append(tag) return unique def extract_title_and_url(container: Tag, base_url: str) -> (Optional[str], Optional[str]): # 优先从标题标签提取 title_tag = None for sel in ["h1 a", "h2 a", "h3 a", "h1", "h2", "h3", "a"]: title_tag = container.select_one(sel) if title_tag: break title = None url = None if title_tag: if title_tag.name == "a": title = title_tag.get_text(strip=True) or None href = title_tag.get("href") if href: url = urljoin(base_url, href) else: title = title_tag.get_text(strip=True) or None a = title_tag.find("a", href=True) if a and a.get("href"): url = urljoin(base_url, a["href"]) # 回退:任何可点击链接 if not url: a = container.find("a", href=True) if a and a.get_text(strip=True): url = urljoin(base_url, a["href"]) title = title or a.get_text(strip=True) return normalize_whitespace(title), url def extract_published_at(container: Tag) -> Optional[str]: # time 标签 t = container.find("time") if t: dt = t.get("datetime") or t.get_text(strip=True) if dt: return normalize_whitespace(dt) # meta 标签(结构化) for sel in [ 'meta[itemprop="datePublished"]', 'meta[property="article:published_time"]', 'meta[name="pubdate"]', 'span[itemprop="datePublished"]', ]: m = container.select_one(sel) if m: content = m.get("content") or m.get_text(strip=True) if content: return normalize_whitespace(content) return None def extract_author(container: Tag) -> Optional[str]: for sel in [ ".author", ".byline", '[itemprop="author"]', 'a[rel="author"]', 'meta[name="author"]', ]: el = container.select_one(sel) if el: content = el.get("content") if el.name == "meta" else el.get_text(strip=True) if content: return normalize_whitespace(content) return None def extract_summary(container: Tag) -> Optional[str]: for sel in [".summary", ".dek", ".abstract", "p"]: el = container.select_one(sel) if el: text = el.get_text(" ", strip=True) if text and len(text.split()) >= 3: return normalize_whitespace(text) return None def parse_detail_page(soup: BeautifulSoup) -> dict: # 从详情页提取更规范的字段(若存在) data = {"title": None, "published_at": None, "author": None, "summary": None} # 标题 title_tag = soup.find(["h1", "title"]) if title_tag: data["title"] = normalize_whitespace(title_tag.get_text(strip=True)) og_title = soup.select_one('meta[property="og:title"]') if og_title and og_title.get("content"): data["title"] = normalize_whitespace(og_title["content"]) or data["title"] # 摘要 og_desc = soup.select_one('meta[property="og:description"], meta[name="description"]') if og_desc and og_desc.get("content"): data["summary"] = normalize_whitespace(og_desc["content"]) # 发布时间 for sel in [ 'meta[property="article:published_time"]', 'meta[itemprop="datePublished"]', 'time[datetime]', ]: el = soup.select_one(sel) if el: content = el.get("content") or el.get("datetime") or el.get_text(strip=True) if content: data["published_at"] = normalize_whitespace(content) break # 作者 for sel in [ 'meta[name="author"]', '[itemprop="author"]', '.author', '.byline', 'a[rel="author"]', ]: el = soup.select_one(sel) if el: content = el.get("content") if el.name == "meta" else el.get_text(strip=True) if content: data["author"] = normalize_whitespace(content) break # JSON-LD(可选增强) for script in soup.find_all("script", type="application/ld+json"): try: payload = json.loads(script.string or "") except Exception: continue objs = payload if isinstance(payload, list) else [payload] for obj in objs: t = obj.get("@type") or obj.get("@graph", [{}])[0].get("@type") if isinstance(t, list): t = ",".join(t) if t and ("NewsArticle" in t or "Article" in t): data["title"] = normalize_whitespace(obj.get("headline")) or data["title"] data["summary"] = normalize_whitespace(obj.get("description")) or data["summary"] pub = obj.get("datePublished") or obj.get("dateCreated") if pub: data["published_at"] = normalize_whitespace(pub) author = obj.get("author") if isinstance(author, dict): data["author"] = normalize_whitespace(author.get("name")) or data["author"] elif isinstance(author, list) and author: if isinstance(author[0], dict): data["author"] = normalize_whitespace(author[0].get("name")) or data["author"] elif isinstance(author[0], str): data["author"] = normalize_whitespace(author[0]) or data["author"] break return data # ----------------------- # 抓取主流程 # ----------------------- def scrape_list(start_url: str, max_pages: int = 1, sleep_min: float = 1.0, sleep_max: float = 2.5, fetch_detail: bool = False) -> List[Article]: session = get_session() rp = build_robot_parser(start_url) articles: List[Article] = [] seen_urls: Set[str] = set() def fetch(url: str) -> Optional[BeautifulSoup]: if not can_fetch(rp, url): logging.warning(f"robots 不允许抓取:{url}") return None try: resp = session.get(url, timeout=30) if resp.status_code != 200: logging.warning(f"请求失败 {resp.status_code}: {url}") return None return BeautifulSoup(resp.text, "html.parser") except requests.RequestException as e: logging.error(f"请求异常: {url} -> {e}") return None page_url = start_url base = f"{urlparse(start_url).scheme}://{urlparse(start_url).netloc}" for page_idx in range(max_pages): soup = fetch(page_url) if soup is None: break containers = find_article_containers(soup) if not containers: logging.info("未找到文章容器,尝试使用更宽松的链接抽取") # 宽松抽取:从主体区块抽取链接 for a in soup.select("main a[href], .content a[href], .container a[href]"): title = normalize_whitespace(a.get_text(strip=True)) if not title or len(title) < 5: continue url = urljoin(base, a.get("href")) if url in seen_urls: continue articles.append(Article( title=title, url=url, published_at=None, author=None, summary=None, source=start_url, )) seen_urls.add(url) else: for c in containers: title, url = extract_title_and_url(c, base) if not url or not title: continue if url in seen_urls: continue published_at = extract_published_at(c) author = extract_author(c) summary = extract_summary(c) # 详情页补全(可选) if fetch_detail and can_fetch(rp, url): time.sleep(random.uniform(sleep_min, sleep_max)) dsoup = fetch(url) if dsoup: detail = parse_detail_page(dsoup) title = detail["title"] or title summary = detail["summary"] or summary published_at = detail["published_at"] or published_at author = detail["author"] or author articles.append(Article( title=title, url=url, published_at=published_at, author=author, summary=summary, source=start_url, )) seen_urls.add(url) # 分页:查找下一页链接 next_link = soup.select_one('a[rel="next"], a.next, a[aria-label*="下一页"], a[aria-label*="Next"], li.next a') if next_link and next_link.get("href"): next_url = urljoin(base, next_link.get("href")) if next_url == page_url: break page_url = next_url logging.info(f"跳转下一页: {page_url}") time.sleep(random.uniform(sleep_min, sleep_max)) else: break return articles # ----------------------- # 输出与 CLI # ----------------------- def write_csv(path: str, items: List[Article]) -> None: with open(path, "w", encoding="utf-8", newline="") as f: w = csv.DictWriter(f, fieldnames=["title", "url", "published_at", "author", "summary", "source"]) w.writeheader() for it in items: w.writerow(asdict(it)) def write_jsonl(path: str, items: List[Article]) -> None: with open(path, "w", encoding="utf-8") as f: for it in items: f.write(json.dumps(asdict(it), ensure_ascii=False) + "\n") def main(): parser = argparse.ArgumentParser(description="Marketing Insights 数据抓取(BeautifulSoup)") parser.add_argument("--url", type=str, default=DEFAULT_URL, help="起始列表页 URL") parser.add_argument("--max-pages", type=int, default=1, help="最大分页页数") parser.add_argument("--sleep-min", type=float, default=1.0, help="最小请求间隔(秒)") parser.add_argument("--sleep-max", type=float, default=2.5, help="最大请求间隔(秒)") parser.add_argument("--detail", action="store_true", help="是否抓取详情页补全字段") parser.add_argument("--output-csv", type=str, default=None, help="CSV 输出路径") parser.add_argument("--output-jsonl", type=str, default=None, help="JSONL 输出路径") args = parser.parse_args() items = scrape_list( start_url=args.url, max_pages=args.max_pages, sleep_min=args.sleep_min, sleep_max=args.sleep_max, fetch_detail=args.detail, ) logging.info(f"抓取完成,共 {len(items)} 条") if args.output_csv: write_csv(args.output_csv, items) logging.info(f"CSV 已写入: {args.output_csv}") if args.output_jsonl: write_jsonl(args.output_jsonl, items) logging.info(f"JSONL 已写入: {args.output_jsonl}") # 如果未指定输出文件,打印示例前几条 if not args.output_csv and not args.output_jsonl: for it in items[:5]: print(json.dumps(asdict(it), ensure_ascii=False)) if __name__ == "__main__": main() ``` 数据挖掘建议(采集到数据后的处理要点): - 字段标准化:将 published_at 统一转换为 ISO8601;作者名进行规范化与实体消歧。 - 去重策略:依据 URL、标题+发布时间的组合键进行二次去重。 - 内容质量过滤:摘要长度阈值、关键词过滤(如“营销”“洞察”等)、语言检测,提升样本纯度。 - 结构化增强:若可用,优先采用页面中的结构化数据(JSON-LD)填补缺失字段。 - 合规与速率:遵守 robots 与站点条款;设置合理的间隔与并发限流,避免对站点造成压力。
快速抓取竞品活动、价格与文案,生成可读数据,支持活动策略制定与投放优化。
批量采集来源数据,结合清洗与预处理建议,缩短数据准备周期,加快洞察产出。
汇总行业资讯与素材链接,规范化整理输出,提升选题效率与内容迭代速度。
监测平台热销榜与评论要点,提炼商品优化方向,助力上架策略与运营决策。
持续抓取对手功能更新与公告页面,形成对比清单,支撑路线图规划与定位调整。
采集公开数据与文献页面,生成结构化整理与说明,用于课题研究与教学案例。
定期抓取FAQ与更新公告,结构化归档,提升知识库覆盖率与自助解决率。
收集职位信息与技能要求,归纳趋势,为人才画像与培训规划提供依据。
将“给定网址+期望输出语言”一键转化为可直接运行的网页数据抓取脚本与清晰操作说明,面向增长、运营、产品、竞品研究与数据分析等高频场景,帮助用户在分钟级完成网页信息采集、基础清洗与结构化整理,显著降低技术门槛,提升交付确定性,为后续分析、可视化与报告撰写提供可靠的数据输入。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
免费获取高级提示词-优惠即将到期